1/* Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2 * Use of this source code is governed by a BSD-style license that can be
3 * found in the LICENSE file.
4 */
5
6#ifndef _GNU_SOURCE
7#define _GNU_SOURCE /* for ppoll */
8#endif
9
10#include <pthread.h>
11#include <poll.h>
12#include <sys/param.h>
13#include <syslog.h>
14
15#include "cras_audio_area.h"
16#include "audio_thread_log.h"
17#include "cras_config.h"
18#include "cras_fmt_conv.h"
19#include "cras_iodev.h"
20#include "cras_rstream.h"
21#include "cras_server_metrics.h"
22#include "cras_system_state.h"
23#include "cras_types.h"
24#include "cras_util.h"
25#include "dev_stream.h"
26#include "audio_thread.h"
27#include "utlist.h"
28
29#define MIN_PROCESS_TIME_US 500 /* 0.5ms - min amount of time to mix/src. */
30#define SLEEP_FUZZ_FRAMES 10 /* # to consider "close enough" to sleep frames. */
31#define MIN_READ_WAIT_US 2000 /* 2ms */
32static const struct timespec playback_wake_fuzz_ts = {
33	0, 500 * 1000 /* 500 usec. */
34};
35
36/* Messages that can be sent from the main context to the audio thread. */
37enum AUDIO_THREAD_COMMAND {
38	AUDIO_THREAD_ADD_OPEN_DEV,
39	AUDIO_THREAD_RM_OPEN_DEV,
40	AUDIO_THREAD_ADD_STREAM,
41	AUDIO_THREAD_DISCONNECT_STREAM,
42	AUDIO_THREAD_STOP,
43	AUDIO_THREAD_DUMP_THREAD_INFO,
44	AUDIO_THREAD_DRAIN_STREAM,
45	AUDIO_THREAD_CONFIG_GLOBAL_REMIX,
46	AUDIO_THREAD_DEV_START_RAMP,
47	AUDIO_THREAD_REMOVE_CALLBACK,
48};
49
50struct audio_thread_msg {
51	size_t length;
52	enum AUDIO_THREAD_COMMAND id;
53};
54
55struct audio_thread_config_global_remix {
56	struct audio_thread_msg header;
57	struct cras_fmt_conv *fmt_conv;
58};
59
60struct audio_thread_open_device_msg {
61	struct audio_thread_msg header;
62	struct cras_iodev *dev;
63};
64
65struct audio_thread_rm_callback_msg {
66	struct audio_thread_msg header;
67	int fd;
68};
69
70struct audio_thread_add_rm_stream_msg {
71	struct audio_thread_msg header;
72	struct cras_rstream *stream;
73	struct cras_iodev **devs;
74	unsigned int num_devs;
75};
76
77struct audio_thread_dump_debug_info_msg {
78	struct audio_thread_msg header;
79	struct audio_debug_info *info;
80};
81
82struct audio_thread_dev_start_ramp_msg {
83	struct audio_thread_msg header;
84	struct cras_iodev *dev;
85	enum CRAS_IODEV_RAMP_REQUEST request;
86};
87
88/* Audio thread logging. */
89struct audio_thread_event_log *atlog;
90/* Global fmt converter used to remix output channels. */
91static struct cras_fmt_conv *remix_converter = NULL;
92
93static struct iodev_callback_list *iodev_callbacks;
94static struct timespec longest_wake;
95
96struct iodev_callback_list {
97	int fd;
98	int is_write;
99	int enabled;
100	thread_callback cb;
101	void *cb_data;
102	struct pollfd *pollfd;
103	struct iodev_callback_list *prev, *next;
104};
105
106static void _audio_thread_add_callback(int fd, thread_callback cb,
107				       void *data, int is_write)
108{
109	struct iodev_callback_list *iodev_cb;
110
111	/* Don't add iodev_cb twice */
112	DL_FOREACH(iodev_callbacks, iodev_cb)
113		if (iodev_cb->fd == fd && iodev_cb->cb_data == data)
114			return;
115
116	iodev_cb = (struct iodev_callback_list *)calloc(1, sizeof(*iodev_cb));
117	iodev_cb->fd = fd;
118	iodev_cb->cb = cb;
119	iodev_cb->cb_data = data;
120	iodev_cb->enabled = 1;
121	iodev_cb->is_write = is_write;
122
123	DL_APPEND(iodev_callbacks, iodev_cb);
124}
125
126void audio_thread_add_callback(int fd, thread_callback cb,
127				void *data)
128{
129	_audio_thread_add_callback(fd, cb, data, 0);
130}
131
132void audio_thread_add_write_callback(int fd, thread_callback cb,
133				     void *data)
134{
135	_audio_thread_add_callback(fd, cb, data, 1);
136}
137
138void audio_thread_rm_callback(int fd)
139{
140	struct iodev_callback_list *iodev_cb;
141
142	DL_FOREACH(iodev_callbacks, iodev_cb) {
143		if (iodev_cb->fd == fd) {
144			DL_DELETE(iodev_callbacks, iodev_cb);
145			free(iodev_cb);
146			return;
147		}
148	}
149}
150
151void audio_thread_enable_callback(int fd, int enabled)
152{
153	struct iodev_callback_list *iodev_cb;
154
155	DL_FOREACH(iodev_callbacks, iodev_cb) {
156		if (iodev_cb->fd == fd) {
157			iodev_cb->enabled = !!enabled;
158			return;
159		}
160	}
161}
162
163/* Sends a response (error code) from the audio thread to the main thread.
164 * Indicates that the last message sent to the audio thread has been handled
165 * with an error code of rc.
166 * Args:
167 *    thread - thread responding to command.
168 *    rc - Result code to send back to the main thread.
169 * Returns:
170 *    The number of bytes written to the main thread.
171 */
172static int audio_thread_send_response(struct audio_thread *thread, int rc)
173{
174	return write(thread->to_main_fds[1], &rc, sizeof(rc));
175}
176
177/* Reads a command from the main thread.  Called from the playback/capture
178 * thread.  This will read the next available command from the main thread and
179 * put it in buf.
180 * Args:
181 *    thread - thread reading the command.
182 *    buf - Message is stored here on return.
183 *    max_len - maximum length of message to put into buf.
184 * Returns:
185 *    0 on success, negative error code on failure.
186 */
187static int audio_thread_read_command(struct audio_thread *thread,
188				     uint8_t *buf,
189				     size_t max_len)
190{
191	int to_read, nread, rc;
192	struct audio_thread_msg *msg = (struct audio_thread_msg *)buf;
193
194	/* Get the length of the message first */
195	nread = read(thread->to_thread_fds[0], buf, sizeof(msg->length));
196	if (nread < 0)
197		return nread;
198	if (msg->length > max_len)
199		return -ENOMEM;
200
201	to_read = msg->length - nread;
202	rc = read(thread->to_thread_fds[0], &buf[0] + nread, to_read);
203	if (rc < 0)
204		return rc;
205	return 0;
206}
207
208/* Builds an initial buffer to avoid an underrun. Adds min_level of latency. */
209static void fill_odevs_zeros_min_level(struct cras_iodev *odev)
210{
211	cras_iodev_fill_odev_zeros(odev, odev->min_buffer_level);
212}
213
214static void thread_rm_open_adev(struct audio_thread *thread,
215				struct open_dev *adev);
216
217static void delete_stream_from_dev(struct cras_iodev *dev,
218				   struct cras_rstream *stream)
219{
220	struct dev_stream *out;
221
222	out = cras_iodev_rm_stream(dev, stream);
223	if (out)
224		dev_stream_destroy(out);
225}
226
227/* Append a new stream to a specified set of iodevs. */
228static int append_stream(struct audio_thread *thread,
229			 struct cras_rstream *stream,
230			 struct cras_iodev **iodevs,
231			 unsigned int num_iodevs)
232{
233	struct open_dev *open_dev;
234	struct cras_iodev *dev;
235	struct dev_stream *out;
236	struct timespec init_cb_ts;
237	const struct timespec *stream_ts;
238	unsigned int i;
239	int rc = 0;
240
241	for (i = 0; i < num_iodevs; i++) {
242		DL_SEARCH_SCALAR(thread->open_devs[stream->direction], open_dev,
243				dev, iodevs[i]);
244		if (!open_dev)
245			continue;
246
247		dev = iodevs[i];
248		DL_SEARCH_SCALAR(dev->streams, out, stream, stream);
249		if (out)
250			continue;
251
252		/* If open device already has stream, get the first stream
253		 * and use its next callback time to align with. Otherwise
254		 * use the timestamp now as the initial callback time for
255		 * new stream.
256		 */
257		if (dev->streams &&
258		    (stream_ts = dev_stream_next_cb_ts(dev->streams)))
259			init_cb_ts = *stream_ts;
260		else
261			clock_gettime(CLOCK_MONOTONIC_RAW, &init_cb_ts);
262
263		out = dev_stream_create(stream, dev->info.idx,
264					dev->ext_format, dev, &init_cb_ts);
265		if (!out) {
266			rc = -EINVAL;
267			break;
268		}
269
270		/* When the first input stream is added, flush the input buffer
271		 * so that we can read from multiple input devices of the same
272		 * buffer level.
273		 */
274		if ((stream->direction == CRAS_STREAM_INPUT) && !dev->streams) {
275			int num_flushed = dev->flush_buffer(dev);
276			if (num_flushed < 0) {
277				rc = num_flushed;
278				break;
279			}
280		}
281
282		cras_iodev_add_stream(dev, out);
283
284		/* For multiple inputs case, if the new stream is not the first
285		 * one to append, copy the 1st stream's offset to it so that
286		 * future read offsets can be aligned across all input streams
287		 * to avoid the deadlock scenario when multiple streams reading
288		 * from multiple devices.
289		 */
290		if ((stream->direction == CRAS_STREAM_INPUT) &&
291		    (dev->streams != out)) {
292			unsigned int offset =
293				cras_iodev_stream_offset(dev, dev->streams);
294			if (offset > stream->cb_threshold)
295				offset = stream->cb_threshold;
296			cras_iodev_stream_written(dev, out, offset);
297
298			offset = cras_rstream_dev_offset(dev->streams->stream,
299							 dev->info.idx);
300			if (offset > stream->cb_threshold)
301				offset = stream->cb_threshold;
302			cras_rstream_dev_offset_update(stream, offset,
303						       dev->info.idx);
304		}
305	}
306
307	if (rc) {
308		DL_FOREACH(thread->open_devs[stream->direction], open_dev) {
309			dev = open_dev->dev;
310			DL_SEARCH_SCALAR(dev->streams, out, stream, stream);
311			if (!out)
312				continue;
313
314			cras_iodev_rm_stream(dev, stream);
315			dev_stream_destroy(out);
316		}
317	}
318
319	return rc;
320}
321
322/* Handles messages from main thread to add a new active device. */
323static int thread_add_open_dev(struct audio_thread *thread,
324			       struct cras_iodev *iodev)
325{
326	struct open_dev *adev;
327
328	DL_SEARCH_SCALAR(thread->open_devs[iodev->direction],
329			 adev, dev, iodev);
330	if (adev)
331		return -EEXIST;
332
333	adev = (struct open_dev *)calloc(1, sizeof(*adev));
334	adev->dev = iodev;
335
336	/*
337	 * Start output devices by padding the output. This avoids a burst of
338	 * audio callbacks when the stream starts
339	 */
340	if (iodev->direction == CRAS_STREAM_OUTPUT)
341		fill_odevs_zeros_min_level(iodev);
342	else
343		adev->input_streaming = 0;
344
345	ATLOG(atlog,
346				    AUDIO_THREAD_DEV_ADDED,
347				    iodev->info.idx, 0, 0);
348
349	DL_APPEND(thread->open_devs[iodev->direction], adev);
350
351	return 0;
352}
353
354static struct open_dev *find_adev(struct open_dev *adev_list,
355				  struct cras_iodev *dev)
356{
357	struct open_dev *adev;
358	DL_FOREACH(adev_list, adev)
359		if (adev->dev == dev)
360			return adev;
361	return NULL;
362}
363
364static void thread_rm_open_adev(struct audio_thread *thread,
365				struct open_dev *dev_to_rm)
366{
367	enum CRAS_STREAM_DIRECTION dir = dev_to_rm->dev->direction;
368	struct open_dev *adev;
369	struct dev_stream *dev_stream;
370
371	/* Do nothing if dev_to_rm wasn't already in the active dev list. */
372	adev = find_adev(thread->open_devs[dir], dev_to_rm->dev);
373	if (!adev)
374		return;
375
376	DL_DELETE(thread->open_devs[dir], dev_to_rm);
377
378	ATLOG(atlog,
379				    AUDIO_THREAD_DEV_REMOVED,
380				    dev_to_rm->dev->info.idx, 0, 0);
381
382	DL_FOREACH(dev_to_rm->dev->streams, dev_stream) {
383		cras_iodev_rm_stream(dev_to_rm->dev, dev_stream->stream);
384		dev_stream_destroy(dev_stream);
385	}
386
387	free(dev_to_rm);
388}
389
390/* Handles messages from the main thread to remove an active device. */
391static int thread_rm_open_dev(struct audio_thread *thread,
392			      struct cras_iodev *iodev)
393{
394	struct open_dev *adev = find_adev(
395			thread->open_devs[iodev->direction], iodev);
396	if (!adev)
397		return -EINVAL;
398
399	thread_rm_open_adev(thread, adev);
400	return 0;
401}
402
403/* Handles messages from the main thread to start ramping on a device. */
404static int thread_dev_start_ramp(struct audio_thread *thread,
405				 struct cras_iodev *iodev,
406				 enum CRAS_IODEV_RAMP_REQUEST request)
407{
408	/* Do nothing if device wasn't already in the active dev list. */
409	struct open_dev *adev = find_adev(
410			thread->open_devs[iodev->direction], iodev);
411	if (!adev)
412		return -EINVAL;
413	return cras_iodev_start_ramp(iodev, request);
414}
415
416
417/* Return non-zero if the stream is attached to any device. */
418static int thread_find_stream(struct audio_thread *thread,
419			      struct cras_rstream *rstream)
420{
421	struct open_dev *open_dev;
422	struct dev_stream *s;
423
424	DL_FOREACH(thread->open_devs[rstream->direction], open_dev) {
425		DL_FOREACH(open_dev->dev->streams, s) {
426			if (s->stream == rstream)
427				return 1;
428		}
429	}
430	return 0;
431}
432
433/* Remove stream from the audio thread. If this is the last stream to be
434 * removed close the device.
435 */
436static int thread_remove_stream(struct audio_thread *thread,
437				struct cras_rstream *stream,
438				struct cras_iodev *dev)
439{
440	struct open_dev *open_dev;
441	struct timespec delay;
442	unsigned fetch_delay_msec;
443
444	/* Metrics log the longest fetch delay of this stream. */
445	if (timespec_after(&stream->longest_fetch_interval,
446			   &stream->sleep_interval_ts)) {
447		subtract_timespecs(&stream->longest_fetch_interval,
448				   &stream->sleep_interval_ts,
449				   &delay);
450		fetch_delay_msec = delay.tv_sec * 1000 +
451				   delay.tv_nsec / 1000000;
452		if (fetch_delay_msec)
453			cras_server_metrics_longest_fetch_delay(
454					fetch_delay_msec);
455	}
456
457	ATLOG(atlog,
458				    AUDIO_THREAD_STREAM_REMOVED,
459				    stream->stream_id, 0, 0);
460
461	if (dev == NULL) {
462		DL_FOREACH(thread->open_devs[stream->direction], open_dev) {
463			delete_stream_from_dev(open_dev->dev, stream);
464		}
465	} else {
466		delete_stream_from_dev(dev, stream);
467	}
468
469	return 0;
470}
471
472/* Handles the disconnect_stream message from the main thread. */
473static int thread_disconnect_stream(struct audio_thread* thread,
474				    struct cras_rstream* stream,
475				    struct cras_iodev *dev)
476{
477	int rc;
478
479	if (!thread_find_stream(thread, stream))
480		return 0;
481
482	rc = thread_remove_stream(thread, stream, dev);
483
484	return rc;
485}
486
487/* Initiates draining of a stream or returns the status of a draining stream.
488 * If the stream has completed draining the thread forfeits ownership and must
489 * never reference it again.  Returns the number of milliseconds it will take to
490 * finish draining, a minimum of one ms if any samples remain.
491 */
492static int thread_drain_stream_ms_remaining(struct audio_thread *thread,
493					    struct cras_rstream *rstream)
494{
495	int fr_in_buff;
496	struct cras_audio_shm *shm;
497
498	if (rstream->direction != CRAS_STREAM_OUTPUT)
499		return 0;
500
501	shm = cras_rstream_output_shm(rstream);
502	fr_in_buff = cras_shm_get_frames(shm);
503
504	if (fr_in_buff <= 0)
505		return 0;
506
507	cras_rstream_set_is_draining(rstream, 1);
508
509	return 1 + cras_frames_to_ms(fr_in_buff, rstream->format.frame_rate);
510}
511
512/* Handles a request to begin draining and return the amount of time left to
513 * draing a stream.
514 */
515static int thread_drain_stream(struct audio_thread *thread,
516			       struct cras_rstream *rstream)
517{
518	int ms_left;
519
520	if (!thread_find_stream(thread, rstream))
521		return 0;
522
523	ms_left = thread_drain_stream_ms_remaining(thread, rstream);
524	if (ms_left == 0)
525		thread_remove_stream(thread, rstream, NULL);
526
527	return ms_left;
528}
529
530/* Handles the add_stream message from the main thread. */
531static int thread_add_stream(struct audio_thread *thread,
532			     struct cras_rstream *stream,
533			     struct cras_iodev **iodevs,
534			     unsigned int num_iodevs)
535{
536	int rc;
537
538	rc = append_stream(thread, stream, iodevs, num_iodevs);
539	if (rc < 0)
540		return rc;
541
542	ATLOG(atlog,
543				    AUDIO_THREAD_STREAM_ADDED,
544				    stream->stream_id,
545				    num_iodevs ? iodevs[0]->info.idx : 0,
546				    num_iodevs);
547	return 0;
548}
549
550/* Reads any pending audio message from the socket. */
551static void flush_old_aud_messages(struct cras_audio_shm *shm, int fd)
552{
553	struct audio_message msg;
554	struct pollfd pollfd;
555	int err;
556
557	pollfd.fd = fd;
558	pollfd.events = POLLIN;
559
560	do {
561		err = poll(&pollfd, 1, 0);
562		if (pollfd.revents & POLLIN) {
563			err = read(fd, &msg, sizeof(msg));
564			cras_shm_set_callback_pending(shm, 0);
565		}
566	} while (err > 0);
567}
568
569/* Asks any stream with room for more data. Sets the time stamp for all streams.
570 * Args:
571 *    thread - The thread to fetch samples for.
572 *    adev - The output device streams are attached to.
573 * Returns:
574 *    0 on success, negative error on failure. If failed, can assume that all
575 *    streams have been removed from the device.
576 */
577static int fetch_streams(struct audio_thread *thread,
578			 struct open_dev *adev)
579{
580	struct dev_stream *dev_stream;
581	struct cras_iodev *odev = adev->dev;
582	int rc;
583	int delay;
584
585	delay = cras_iodev_delay_frames(odev);
586	if (delay < 0)
587		return delay;
588
589	DL_FOREACH(adev->dev->streams, dev_stream) {
590		struct cras_rstream *rstream = dev_stream->stream;
591		struct cras_audio_shm *shm =
592			cras_rstream_output_shm(rstream);
593		int fd = cras_rstream_get_audio_fd(rstream);
594		const struct timespec *next_cb_ts;
595		struct timespec now;
596
597		clock_gettime(CLOCK_MONOTONIC_RAW, &now);
598
599		if (cras_shm_callback_pending(shm) && fd >= 0) {
600			flush_old_aud_messages(shm, fd);
601			cras_rstream_record_fetch_interval(dev_stream->stream,
602							   &now);
603		}
604
605		if (cras_shm_get_frames(shm) < 0)
606			cras_rstream_set_is_draining(rstream, 1);
607
608		if (cras_rstream_get_is_draining(dev_stream->stream))
609			continue;
610
611		next_cb_ts = dev_stream_next_cb_ts(dev_stream);
612		if (!next_cb_ts)
613			continue;
614
615		/* Check if it's time to get more data from this stream.
616		 * Allowing for waking up half a little early. */
617		add_timespecs(&now, &playback_wake_fuzz_ts);
618		if (!timespec_after(&now, next_cb_ts))
619			continue;
620
621		if (!dev_stream_can_fetch(dev_stream)) {
622			ATLOG(
623				atlog, AUDIO_THREAD_STREAM_SKIP_CB,
624				rstream->stream_id,
625				shm->area->write_offset[0],
626				shm->area->write_offset[1]);
627			continue;
628		}
629
630		dev_stream_set_delay(dev_stream, delay);
631
632		ATLOG(
633				atlog,
634				AUDIO_THREAD_FETCH_STREAM,
635				rstream->stream_id,
636				cras_rstream_get_cb_threshold(rstream), delay);
637
638		rc = dev_stream_request_playback_samples(dev_stream, &now);
639		if (rc < 0) {
640			syslog(LOG_ERR, "fetch err: %d for %x",
641			       rc, rstream->stream_id);
642			cras_rstream_set_is_draining(rstream, 1);
643		}
644	}
645
646	return 0;
647}
648
649/* Fill the buffer with samples from the attached streams.
650 * Args:
651 *    thread - The thread object the device is attached to.
652 *    adev - The device to write to.
653 *    dst - The buffer to put the samples in (returned from snd_pcm_mmap_begin)
654 *    write_limit - The maximum number of frames to write to dst.
655 *
656 * Returns:
657 *    The number of frames rendered on success, a negative error code otherwise.
658 *    This number of frames is the minimum of the amount of frames each stream
659 *    could provide which is the maximum that can currently be rendered.
660 */
661static int write_streams(struct audio_thread *thread,
662			 struct open_dev *adev,
663			 uint8_t *dst,
664			 size_t write_limit)
665{
666	struct cras_iodev *odev = adev->dev;
667	struct dev_stream *curr;
668	unsigned int max_offset = 0;
669	unsigned int frame_bytes = cras_get_format_bytes(odev->ext_format);
670	unsigned int num_playing = 0;
671	unsigned int drain_limit = write_limit;
672
673	/* Mix as much as we can, the minimum fill level of any stream. */
674	max_offset = cras_iodev_max_stream_offset(odev);
675
676        /* Mix as much as we can, the minimum fill level of any stream. */
677	DL_FOREACH(adev->dev->streams, curr) {
678		int dev_frames;
679
680		/* If this is a single output dev stream, updates the latest
681		 * number of frames for playback. */
682		if (dev_stream_attached_devs(curr) == 1)
683			dev_stream_update_frames(curr);
684
685		dev_frames = dev_stream_playback_frames(curr);
686		if (dev_frames < 0) {
687			thread_remove_stream(thread, curr->stream, NULL);
688			continue;
689		}
690		ATLOG(atlog,
691				AUDIO_THREAD_WRITE_STREAMS_STREAM,
692				curr->stream->stream_id,
693				dev_frames,
694				cras_shm_callback_pending(cras_rstream_output_shm(curr->stream)));
695		if (cras_rstream_get_is_draining(curr->stream)) {
696			drain_limit = MIN((size_t)dev_frames, drain_limit);
697			if (!dev_frames)
698				thread_remove_stream(thread, curr->stream,
699						     NULL);
700		} else {
701			write_limit = MIN((size_t)dev_frames, write_limit);
702			num_playing++;
703		}
704	}
705
706	if (!num_playing)
707		write_limit = drain_limit;
708
709	if (write_limit > max_offset)
710		memset(dst + max_offset * frame_bytes, 0,
711		       (write_limit - max_offset) * frame_bytes);
712
713	ATLOG(atlog, AUDIO_THREAD_WRITE_STREAMS_MIX,
714				    write_limit, max_offset, 0);
715
716	DL_FOREACH(adev->dev->streams, curr) {
717		unsigned int offset;
718		int nwritten;
719
720		offset = cras_iodev_stream_offset(odev, curr);
721		if (offset >= write_limit)
722			continue;
723		nwritten = dev_stream_mix(curr, odev->ext_format,
724					  dst + frame_bytes * offset,
725					  write_limit - offset);
726
727		if (nwritten < 0) {
728			thread_remove_stream(thread, curr->stream, NULL);
729			continue;
730		}
731
732		cras_iodev_stream_written(odev, curr, nwritten);
733	}
734
735	write_limit = cras_iodev_all_streams_written(odev);
736
737	ATLOG(atlog, AUDIO_THREAD_WRITE_STREAMS_MIXED,
738				    write_limit, 0, 0);
739
740	return write_limit;
741}
742
743/* Gets the max delay frames of open input devices. */
744static int input_delay_frames(struct open_dev *adevs)
745{
746	struct open_dev *adev;
747	int delay;
748	int max_delay = 0;
749
750	DL_FOREACH(adevs, adev) {
751		if (!cras_iodev_is_open(adev->dev))
752			continue;
753		delay = cras_iodev_delay_frames(adev->dev);
754		if (delay < 0)
755			return delay;
756		if (delay > max_delay)
757			max_delay = delay;
758	}
759	return max_delay;
760}
761
762/* Stop the playback thread */
763static void terminate_pb_thread()
764{
765	pthread_exit(0);
766}
767
768static void append_dev_dump_info(struct audio_dev_debug_info *di,
769				 struct open_dev *adev)
770{
771	struct cras_audio_format *fmt = adev->dev->ext_format;
772	strncpy(di->dev_name, adev->dev->info.name, sizeof(di->dev_name));
773	di->buffer_size = adev->dev->buffer_size;
774	di->min_buffer_level = adev->dev->min_buffer_level;
775	di->min_cb_level = adev->dev->min_cb_level;
776	di->max_cb_level = adev->dev->max_cb_level;
777	di->direction = adev->dev->direction;
778	di->num_underruns = cras_iodev_get_num_underruns(adev->dev);
779	di->num_severe_underruns = cras_iodev_get_num_severe_underruns(
780			adev->dev);
781	if (fmt) {
782		di->frame_rate = fmt->frame_rate;
783		di->num_channels = fmt->num_channels;
784		di->est_rate_ratio = cras_iodev_get_est_rate_ratio(adev->dev);
785	} else {
786		di->frame_rate = 0;
787		di->num_channels = 0;
788		di->est_rate_ratio = 0;
789	}
790}
791
792/* Put stream info for the given stream into the info struct. */
793static void append_stream_dump_info(struct audio_debug_info *info,
794				    struct dev_stream *stream,
795				    unsigned int dev_idx,
796				    int index)
797{
798	struct audio_stream_debug_info *si;
799
800	si = &info->streams[index];
801
802	si->stream_id = stream->stream->stream_id;
803	si->dev_idx = dev_idx;
804	si->direction = stream->stream->direction;
805	si->stream_type = stream->stream->stream_type;
806	si->buffer_frames = stream->stream->buffer_frames;
807	si->cb_threshold = stream->stream->cb_threshold;
808	si->frame_rate = stream->stream->format.frame_rate;
809	si->num_channels = stream->stream->format.num_channels;
810	memcpy(si->channel_layout, stream->stream->format.channel_layout,
811	       sizeof(si->channel_layout));
812	si->longest_fetch_sec = stream->stream->longest_fetch_interval.tv_sec;
813	si->longest_fetch_nsec = stream->stream->longest_fetch_interval.tv_nsec;
814	si->num_overruns = cras_shm_num_overruns(&stream->stream->shm);
815
816	longest_wake.tv_sec = 0;
817	longest_wake.tv_nsec = 0;
818}
819
820/* Handle a message sent to the playback thread */
821static int handle_playback_thread_message(struct audio_thread *thread)
822{
823	uint8_t buf[256];
824	struct audio_thread_msg *msg = (struct audio_thread_msg *)buf;
825	int ret = 0;
826	int err;
827
828	err = audio_thread_read_command(thread, buf, 256);
829	if (err < 0)
830		return err;
831
832	ATLOG(atlog, AUDIO_THREAD_PB_MSG, msg->id, 0, 0);
833
834	switch (msg->id) {
835	case AUDIO_THREAD_ADD_STREAM: {
836		struct audio_thread_add_rm_stream_msg *amsg;
837		amsg = (struct audio_thread_add_rm_stream_msg *)msg;
838		ATLOG(
839			atlog,
840			AUDIO_THREAD_WRITE_STREAMS_WAIT,
841			amsg->stream->stream_id, 0, 0);
842		ret = thread_add_stream(thread, amsg->stream, amsg->devs,
843				amsg->num_devs);
844		break;
845	}
846	case AUDIO_THREAD_DISCONNECT_STREAM: {
847		struct audio_thread_add_rm_stream_msg *rmsg;
848
849		rmsg = (struct audio_thread_add_rm_stream_msg *)msg;
850
851		ret = thread_disconnect_stream(thread, rmsg->stream,
852				rmsg->devs[0]);
853		break;
854	}
855	case AUDIO_THREAD_ADD_OPEN_DEV: {
856		struct audio_thread_open_device_msg *rmsg;
857
858		rmsg = (struct audio_thread_open_device_msg *)msg;
859		ret = thread_add_open_dev(thread, rmsg->dev);
860		break;
861	}
862	case AUDIO_THREAD_RM_OPEN_DEV: {
863		struct audio_thread_open_device_msg *rmsg;
864
865		rmsg = (struct audio_thread_open_device_msg *)msg;
866		ret = thread_rm_open_dev(thread, rmsg->dev);
867		break;
868	}
869	case AUDIO_THREAD_STOP:
870		ret = 0;
871		err = audio_thread_send_response(thread, ret);
872		if (err < 0)
873			return err;
874		terminate_pb_thread();
875		break;
876	case AUDIO_THREAD_DUMP_THREAD_INFO: {
877		struct dev_stream *curr;
878		struct open_dev *adev;
879		struct audio_thread_dump_debug_info_msg *dmsg;
880		struct audio_debug_info *info;
881		unsigned int num_streams = 0;
882		unsigned int num_devs = 0;
883
884		ret = 0;
885		dmsg = (struct audio_thread_dump_debug_info_msg *)msg;
886		info = dmsg->info;
887
888		/* Go through all open devices. */
889		DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
890			append_dev_dump_info(&info->devs[num_devs], adev);
891			if (++num_devs == MAX_DEBUG_DEVS)
892				break;
893			DL_FOREACH(adev->dev->streams, curr) {
894				if (num_streams == MAX_DEBUG_STREAMS)
895					break;
896				append_stream_dump_info(info, curr,
897							adev->dev->info.idx,
898							num_streams++);
899			}
900		}
901		DL_FOREACH(thread->open_devs[CRAS_STREAM_INPUT], adev) {
902			if (num_devs == MAX_DEBUG_DEVS)
903				break;
904			append_dev_dump_info(&info->devs[num_devs], adev);
905			DL_FOREACH(adev->dev->streams, curr) {
906				if (num_streams == MAX_DEBUG_STREAMS)
907					break;
908				append_stream_dump_info(info, curr,
909							adev->dev->info.idx,
910							num_streams++);
911			}
912			++num_devs;
913		}
914		info->num_devs = num_devs;
915
916		info->num_streams = num_streams;
917
918		memcpy(&info->log, atlog, sizeof(info->log));
919		break;
920	}
921	case AUDIO_THREAD_DRAIN_STREAM: {
922		struct audio_thread_add_rm_stream_msg *rmsg;
923
924		rmsg = (struct audio_thread_add_rm_stream_msg *)msg;
925		ret = thread_drain_stream(thread, rmsg->stream);
926		break;
927	}
928	case AUDIO_THREAD_REMOVE_CALLBACK: {
929		struct audio_thread_rm_callback_msg *rmsg;
930
931		rmsg = (struct audio_thread_rm_callback_msg *)msg;
932		audio_thread_rm_callback(rmsg->fd);
933		break;
934	}
935	case AUDIO_THREAD_CONFIG_GLOBAL_REMIX: {
936		struct audio_thread_config_global_remix *rmsg;
937		void *rsp;
938
939		/* Respond the pointer to the old remix converter, so it can be
940		 * freed later in main thread. */
941		rsp = (void *)remix_converter;
942
943		rmsg = (struct audio_thread_config_global_remix *)msg;
944		remix_converter = rmsg->fmt_conv;
945
946		return write(thread->to_main_fds[1], &rsp, sizeof(rsp));
947	}
948	case AUDIO_THREAD_DEV_START_RAMP: {
949		struct audio_thread_dev_start_ramp_msg *rmsg;
950
951		rmsg = (struct audio_thread_dev_start_ramp_msg*)msg;
952		ret = thread_dev_start_ramp(thread, rmsg->dev, rmsg->request);
953		break;
954	}
955	default:
956		ret = -EINVAL;
957		break;
958	}
959
960	err = audio_thread_send_response(thread, ret);
961	if (err < 0)
962		return err;
963	return ret;
964}
965
966/* Fills the time that the next stream needs to be serviced. */
967static int get_next_stream_wake_from_list(struct dev_stream *streams,
968					  struct timespec *min_ts)
969{
970	struct dev_stream *dev_stream;
971	int ret = 0; /* The total number of streams to wait on. */
972
973	DL_FOREACH(streams, dev_stream) {
974		const struct timespec *next_cb_ts;
975
976		if (cras_rstream_get_is_draining(dev_stream->stream) &&
977		    dev_stream_playback_frames(dev_stream) <= 0)
978			continue;
979		if (!dev_stream_can_fetch(dev_stream))
980			continue;
981
982		next_cb_ts = dev_stream_next_cb_ts(dev_stream);
983		if (!next_cb_ts)
984			continue;
985
986		ATLOG(atlog,
987					    AUDIO_THREAD_STREAM_SLEEP_TIME,
988					    dev_stream->stream->stream_id,
989					    next_cb_ts->tv_sec,
990					    next_cb_ts->tv_nsec);
991		if (timespec_after(min_ts, next_cb_ts))
992			*min_ts = *next_cb_ts;
993		ret++;
994	}
995
996	return ret;
997}
998
999static int get_next_output_wake(struct audio_thread *thread,
1000				 struct timespec *min_ts,
1001				 const struct timespec *now)
1002{
1003	struct open_dev *adev;
1004	struct timespec sleep_time;
1005	double est_rate;
1006	int ret = 0;
1007	unsigned int frames_to_play_in_sleep;
1008	unsigned int hw_level = 0;
1009
1010	DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev)
1011		ret += get_next_stream_wake_from_list(
1012				adev->dev->streams,
1013				min_ts);
1014
1015	DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
1016		if (!cras_iodev_odev_should_wake(adev->dev))
1017			continue;
1018
1019		frames_to_play_in_sleep = cras_iodev_frames_to_play_in_sleep(
1020				adev->dev, &hw_level, &adev->wake_ts);
1021		if (!timespec_is_nonzero(&adev->wake_ts))
1022			adev->wake_ts = *now;
1023
1024		est_rate = adev->dev->ext_format->frame_rate *
1025				cras_iodev_get_est_rate_ratio(adev->dev);
1026
1027		ATLOG(atlog,
1028	              AUDIO_THREAD_SET_DEV_WAKE,
1029		      adev->dev->info.idx,
1030		      hw_level,
1031		      frames_to_play_in_sleep);
1032
1033		cras_frames_to_time_precise(
1034				frames_to_play_in_sleep,
1035				est_rate,
1036				&sleep_time);
1037
1038		add_timespecs(&adev->wake_ts, &sleep_time);
1039
1040		ret++;
1041		ATLOG(atlog,
1042					    AUDIO_THREAD_DEV_SLEEP_TIME,
1043					    adev->dev->info.idx,
1044					    adev->wake_ts.tv_sec,
1045					    adev->wake_ts.tv_nsec);
1046		if (timespec_after(min_ts, &adev->wake_ts))
1047			*min_ts = adev->wake_ts;
1048	}
1049
1050	return ret;
1051}
1052
1053static int input_adev_ignore_wake(const struct open_dev *adev)
1054{
1055	if (!cras_iodev_is_open(adev->dev))
1056		return 1;
1057
1058	if (!adev->dev->active_node)
1059		return 1;
1060
1061	if (adev->dev->active_node->type == CRAS_NODE_TYPE_HOTWORD &&
1062	    !adev->input_streaming)
1063		return 1;
1064
1065	return 0;
1066}
1067
1068static int get_next_input_wake(struct audio_thread *thread,
1069			       struct timespec *min_ts,
1070			       const struct timespec *now)
1071{
1072	struct open_dev *adev;
1073	int ret = 0; /* The total number of devices to wait on. */
1074
1075	DL_FOREACH(thread->open_devs[CRAS_STREAM_INPUT], adev) {
1076		if (input_adev_ignore_wake(adev))
1077			continue;
1078		ret++;
1079		ATLOG(atlog,
1080					    AUDIO_THREAD_DEV_SLEEP_TIME,
1081					    adev->dev->info.idx,
1082					    adev->wake_ts.tv_sec,
1083					    adev->wake_ts.tv_nsec);
1084		if (timespec_after(min_ts, &adev->wake_ts))
1085			*min_ts = adev->wake_ts;
1086	}
1087
1088	return ret;
1089}
1090
1091static int output_stream_fetch(struct audio_thread *thread)
1092{
1093	struct open_dev *odev_list = thread->open_devs[CRAS_STREAM_OUTPUT];
1094	struct open_dev *adev;
1095
1096	DL_FOREACH(odev_list, adev) {
1097		if (!cras_iodev_is_open(adev->dev))
1098			continue;
1099		fetch_streams(thread, adev);
1100	}
1101
1102	return 0;
1103}
1104
1105static int wait_pending_output_streams(struct audio_thread *thread)
1106{
1107	/* TODO(dgreid) - is this needed? */
1108	return 0;
1109}
1110
1111/* Gets the master device which the stream is attached to. */
1112static inline
1113struct cras_iodev *get_master_dev(const struct dev_stream *stream)
1114{
1115	return (struct cras_iodev *)stream->stream->master_dev.dev_ptr;
1116}
1117
1118/* Updates the estimated sample rate of open device to all attached
1119 * streams.
1120 */
1121static void update_estimated_rate(struct audio_thread *thread,
1122				  struct open_dev *adev)
1123{
1124	struct cras_iodev *master_dev;
1125	struct cras_iodev *dev = adev->dev;
1126	struct dev_stream *dev_stream;
1127
1128	DL_FOREACH(dev->streams, dev_stream) {
1129		master_dev = get_master_dev(dev_stream);
1130		if (master_dev == NULL) {
1131			syslog(LOG_ERR, "Fail to find master open dev.");
1132			continue;
1133		}
1134
1135		dev_stream_set_dev_rate(dev_stream,
1136				dev->ext_format->frame_rate,
1137				cras_iodev_get_est_rate_ratio(dev),
1138				cras_iodev_get_est_rate_ratio(master_dev),
1139				adev->coarse_rate_adjust);
1140	}
1141}
1142
1143/* Returns 0 on success negative error on device failure. */
1144static int write_output_samples(struct audio_thread *thread,
1145				struct open_dev *adev)
1146{
1147	struct cras_iodev *odev = adev->dev;
1148	unsigned int hw_level;
1149	struct timespec hw_tstamp;
1150	unsigned int frames, fr_to_req;
1151	snd_pcm_sframes_t written;
1152	snd_pcm_uframes_t total_written = 0;
1153	int rc;
1154	uint8_t *dst = NULL;
1155	struct cras_audio_area *area = NULL;
1156
1157	/* Possibly fill zeros for no_stream state and possibly transit state.
1158	 */
1159	rc = cras_iodev_prepare_output_before_write_samples(odev);
1160	if (rc < 0) {
1161		syslog(LOG_ERR, "Failed to prepare output dev for write");
1162		return rc;
1163	}
1164
1165	if (cras_iodev_state(odev) != CRAS_IODEV_STATE_NORMAL_RUN)
1166		return 0;
1167
1168	rc = cras_iodev_frames_queued(odev, &hw_tstamp);
1169	if (rc < 0)
1170		return rc;
1171	hw_level = rc;
1172
1173	ATLOG(atlog, AUDIO_THREAD_FILL_AUDIO_TSTAMP, adev->dev->info.idx,
1174	      hw_tstamp.tv_sec, hw_tstamp.tv_nsec);
1175	if (timespec_is_nonzero(&hw_tstamp)) {
1176		if (hw_level < odev->min_cb_level / 2)
1177			adev->coarse_rate_adjust = 1;
1178		else if (hw_level > odev->max_cb_level * 2)
1179			adev->coarse_rate_adjust = -1;
1180		else
1181			adev->coarse_rate_adjust = 0;
1182
1183		if (cras_iodev_update_rate(odev, hw_level, &hw_tstamp))
1184			update_estimated_rate(thread, adev);
1185	}
1186	ATLOG(atlog, AUDIO_THREAD_FILL_AUDIO,
1187				    adev->dev->info.idx, hw_level, 0);
1188
1189	/* Don't request more than hardware can hold. Note that min_buffer_level
1190	 * has been subtracted from the actual hw_level so we need to take it
1191	 * into account here. */
1192	fr_to_req = cras_iodev_buffer_avail(odev, hw_level);
1193
1194	/* Have to loop writing to the device, will be at most 2 loops, this
1195	 * only happens when the circular buffer is at the end and returns us a
1196	 * partial area to write to from mmap_begin */
1197	while (total_written < fr_to_req) {
1198		frames = fr_to_req - total_written;
1199		rc = cras_iodev_get_output_buffer(odev, &area, &frames);
1200		if (rc < 0)
1201			return rc;
1202
1203		/* TODO(dgreid) - This assumes interleaved audio. */
1204		dst = area->channels[0].buf;
1205		written = write_streams(thread, adev, dst, frames);
1206		if (written < 0) /* pcm has been closed */
1207			return (int)written;
1208
1209		if (written < (snd_pcm_sframes_t)frames)
1210			/* Got all the samples from client that we can, but it
1211			 * won't fill the request. */
1212			fr_to_req = 0; /* break out after committing samples */
1213
1214		rc = cras_iodev_put_output_buffer(odev, dst, written);
1215		if (rc < 0)
1216			return rc;
1217		total_written += written;
1218	}
1219
1220	/* Empty hardware and nothing written, zero fill it if it is running. */
1221	if (!hw_level && !total_written &&
1222	    odev->min_cb_level < odev->buffer_size)
1223		cras_iodev_output_underrun(odev);
1224
1225	ATLOG(atlog, AUDIO_THREAD_FILL_AUDIO_DONE,
1226			hw_level, total_written, odev->min_cb_level);
1227	return 0;
1228}
1229
1230static int do_playback(struct audio_thread *thread)
1231{
1232	struct open_dev *adev;
1233	struct dev_stream *curr;
1234	int rc;
1235
1236	/* For multiple output case, update the number of queued frames in shm
1237	 * of all streams before starting write output samples. */
1238	adev = thread->open_devs[CRAS_STREAM_OUTPUT];
1239	if (adev && adev->next) {
1240		DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
1241			DL_FOREACH(adev->dev->streams, curr)
1242				dev_stream_update_frames(curr);
1243		}
1244	}
1245
1246	DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
1247		if (!cras_iodev_is_open(adev->dev))
1248			continue;
1249
1250		rc = write_output_samples(thread, adev);
1251		if (rc < 0) {
1252			if (rc == -EPIPE) {
1253				/* Handle severe underrun. */
1254				ATLOG(atlog, AUDIO_THREAD_SEVERE_UNDERRUN,
1255				      adev->dev->info.idx, 0, 0);
1256				cras_iodev_reset_request(adev->dev);
1257			} else {
1258				/* Device error, close it. */
1259				thread_rm_open_adev(thread, adev);
1260			}
1261		}
1262	}
1263
1264	/* TODO(dgreid) - once per rstream, not once per dev_stream. */
1265	DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
1266		struct dev_stream *stream;
1267		if (!cras_iodev_is_open(adev->dev))
1268			continue;
1269		DL_FOREACH(adev->dev->streams, stream) {
1270			dev_stream_playback_update_rstream(stream);
1271		}
1272	}
1273
1274	return 0;
1275}
1276
1277/* Gets the minimum amount of space available for writing across all streams.
1278 * Args:
1279 *    adev - The device to capture from.
1280 *    write_limit - Initial limit to number of frames to capture.
1281 */
1282static unsigned int get_stream_limit_set_delay(struct open_dev *adev,
1283					      unsigned int write_limit)
1284{
1285	struct cras_rstream *rstream;
1286	struct cras_audio_shm *shm;
1287	struct dev_stream *stream;
1288	int delay;
1289	unsigned int avail;
1290
1291	/* TODO(dgreid) - Setting delay from last dev only. */
1292	delay = input_delay_frames(adev);
1293
1294	DL_FOREACH(adev->dev->streams, stream) {
1295		rstream = stream->stream;
1296
1297		shm = cras_rstream_input_shm(rstream);
1298		if (cras_shm_check_write_overrun(shm))
1299			ATLOG(atlog, AUDIO_THREAD_READ_OVERRUN,
1300			      adev->dev->info.idx, rstream->stream_id,
1301			      shm->area->num_overruns);
1302		dev_stream_set_delay(stream, delay);
1303		avail = dev_stream_capture_avail(stream);
1304		write_limit = MIN(write_limit, avail);
1305	}
1306
1307	return write_limit;
1308}
1309
1310/* Read samples from an input device to the specified stream.
1311 * Args:
1312 *    adev - The device to capture samples from.
1313 * Returns 0 on success.
1314 */
1315static int capture_to_streams(struct audio_thread *thread,
1316			      struct open_dev *adev)
1317{
1318	struct cras_iodev *idev = adev->dev;
1319	snd_pcm_uframes_t remainder, hw_level, cap_limit;
1320	struct timespec hw_tstamp;
1321	int rc;
1322
1323	rc = cras_iodev_frames_queued(idev, &hw_tstamp);
1324	if (rc < 0)
1325		return rc;
1326	hw_level = rc;
1327
1328	ATLOG(atlog, AUDIO_THREAD_READ_AUDIO_TSTAMP, idev->info.idx,
1329	      hw_tstamp.tv_sec, hw_tstamp.tv_nsec);
1330	if (timespec_is_nonzero(&hw_tstamp)) {
1331		if (hw_level)
1332			adev->input_streaming = 1;
1333
1334		if (hw_level < idev->min_cb_level / 2)
1335			adev->coarse_rate_adjust = 1;
1336		else if (hw_level > idev->max_cb_level * 2)
1337			adev->coarse_rate_adjust = -1;
1338		else
1339			adev->coarse_rate_adjust = 0;
1340		if (cras_iodev_update_rate(idev, hw_level, &hw_tstamp))
1341			update_estimated_rate(thread, adev);
1342	}
1343
1344	cap_limit = get_stream_limit_set_delay(adev, hw_level);
1345	remainder = MIN(hw_level, cap_limit);
1346
1347	ATLOG(atlog, AUDIO_THREAD_READ_AUDIO,
1348				    idev->info.idx, hw_level, remainder);
1349
1350	if (cras_iodev_state(idev) != CRAS_IODEV_STATE_NORMAL_RUN)
1351		return 0;
1352
1353	while (remainder > 0) {
1354		struct cras_audio_area *area = NULL;
1355		struct dev_stream *stream;
1356		unsigned int nread, total_read;
1357
1358		nread = remainder;
1359
1360		rc = cras_iodev_get_input_buffer(idev, &area, &nread);
1361		if (rc < 0 || nread == 0)
1362			return rc;
1363
1364		DL_FOREACH(adev->dev->streams, stream) {
1365			unsigned int this_read;
1366			unsigned int area_offset;
1367
1368			area_offset = cras_iodev_stream_offset(idev, stream);
1369			this_read = dev_stream_capture(
1370				stream, area, area_offset,
1371				cras_iodev_get_software_gain_scaler(idev));
1372
1373			cras_iodev_stream_written(idev, stream, this_read);
1374		}
1375		if (adev->dev->streams)
1376			total_read = cras_iodev_all_streams_written(idev);
1377		else
1378			total_read = nread; /* No streams, drop. */
1379
1380		rc = cras_iodev_put_input_buffer(idev, total_read);
1381		if (rc < 0)
1382			return rc;
1383		remainder -= nread;
1384
1385		if (total_read < nread)
1386			break;
1387	}
1388
1389	ATLOG(atlog, AUDIO_THREAD_READ_AUDIO_DONE,
1390				    remainder, 0, 0);
1391
1392	return 0;
1393}
1394
1395static int do_capture(struct audio_thread *thread)
1396{
1397	struct open_dev *idev_list = thread->open_devs[CRAS_STREAM_INPUT];
1398	struct open_dev *adev;
1399
1400	DL_FOREACH(idev_list, adev) {
1401		if (!cras_iodev_is_open(adev->dev))
1402			continue;
1403		if (capture_to_streams(thread, adev) < 0)
1404			thread_rm_open_adev(thread, adev);
1405	}
1406
1407	return 0;
1408}
1409
1410/*
1411 * Set wake_ts for this device to be the earliest wake up time for
1412 * dev_streams.
1413 */
1414static int set_input_dev_wake_ts(struct open_dev *adev)
1415{
1416	int rc;
1417	struct timespec level_tstamp, wake_time_out, min_ts, now;
1418	unsigned int curr_level;
1419	struct dev_stream *stream;
1420
1421	/* Limit the sleep time to 20 seconds. */
1422	min_ts.tv_sec = 20;
1423	min_ts.tv_nsec = 0;
1424	clock_gettime(CLOCK_MONOTONIC_RAW, &now);
1425	add_timespecs(&min_ts, &now);
1426
1427	curr_level = cras_iodev_frames_queued(adev->dev, &level_tstamp);
1428	if (!timespec_is_nonzero(&level_tstamp))
1429		clock_gettime(CLOCK_MONOTONIC_RAW, &level_tstamp);
1430
1431	/*
1432	 * Loop through streams to find the earliest time audio thread
1433	 * should wake up.
1434	 */
1435	DL_FOREACH(adev->dev->streams, stream) {
1436		rc = dev_stream_wake_time(
1437			stream,
1438			curr_level,
1439			&level_tstamp,
1440			&wake_time_out);
1441
1442		if (rc < 0)
1443			return rc;
1444
1445		if (timespec_after(&min_ts, &wake_time_out)) {
1446			min_ts = wake_time_out;
1447		}
1448	}
1449	adev->wake_ts = min_ts;
1450	return 0;
1451}
1452
1453static int send_captured_samples(struct audio_thread *thread)
1454{
1455	struct open_dev *idev_list = thread->open_devs[CRAS_STREAM_INPUT];
1456	struct open_dev *adev;
1457	int rc;
1458
1459	// TODO(dgreid) - once per rstream, not once per dev_stream.
1460	DL_FOREACH(idev_list, adev) {
1461		struct dev_stream *stream;
1462
1463		if (!cras_iodev_is_open(adev->dev))
1464			continue;
1465
1466		/* Post samples to rstream if there are enough samples. */
1467		DL_FOREACH(adev->dev->streams, stream) {
1468			dev_stream_capture_update_rstream(stream);
1469		}
1470
1471		/* Set wake_ts for this device. */
1472		rc = set_input_dev_wake_ts(adev);
1473		if (rc < 0)
1474			return rc;
1475	}
1476
1477	return 0;
1478}
1479
1480/* Reads and/or writes audio sampels from/to the devices. */
1481static int stream_dev_io(struct audio_thread *thread)
1482{
1483	output_stream_fetch(thread);
1484	do_capture(thread);
1485	send_captured_samples(thread);
1486	wait_pending_output_streams(thread);
1487	do_playback(thread);
1488
1489	return 0;
1490}
1491
1492int fill_next_sleep_interval(struct audio_thread *thread, struct timespec *ts)
1493{
1494	struct timespec min_ts;
1495	struct timespec now;
1496	int ret; /* The sum of active streams and devices. */
1497
1498	ts->tv_sec = 0;
1499	ts->tv_nsec = 0;
1500	/* Limit the sleep time to 20 seconds. */
1501	min_ts.tv_sec = 20;
1502	min_ts.tv_nsec = 0;
1503	clock_gettime(CLOCK_MONOTONIC_RAW, &now);
1504	add_timespecs(&min_ts, &now);
1505	ret = get_next_output_wake(thread, &min_ts, &now);
1506	ret += get_next_input_wake(thread, &min_ts, &now);
1507	if (timespec_after(&min_ts, &now))
1508		subtract_timespecs(&min_ts, &now, ts);
1509
1510	return ret;
1511}
1512
1513/* For playback, fill the audio buffer when needed, for capture, pull out
1514 * samples when they are ready.
1515 * This thread will attempt to run at a high priority to allow for low latency
1516 * streams.  This thread sleeps while the device plays back or captures audio,
1517 * it will wake up as little as it can while avoiding xruns.  It can also be
1518 * woken by sending it a message using the "audio_thread_post_message" function.
1519 */
1520static void *audio_io_thread(void *arg)
1521{
1522	struct audio_thread *thread = (struct audio_thread *)arg;
1523	struct open_dev *adev;
1524	struct dev_stream *curr;
1525	struct timespec ts, now, last_wake;
1526	struct pollfd *pollfds;
1527	unsigned int num_pollfds;
1528	unsigned int pollfds_size = 32;
1529	int msg_fd;
1530	int rc;
1531
1532	msg_fd = thread->to_thread_fds[0];
1533
1534	/* Attempt to get realtime scheduling */
1535	if (cras_set_rt_scheduling(CRAS_SERVER_RT_THREAD_PRIORITY) == 0)
1536		cras_set_thread_priority(CRAS_SERVER_RT_THREAD_PRIORITY);
1537
1538	last_wake.tv_sec = 0;
1539	longest_wake.tv_sec = 0;
1540	longest_wake.tv_nsec = 0;
1541
1542	pollfds = (struct pollfd *)malloc(sizeof(*pollfds) * pollfds_size);
1543	pollfds[0].fd = msg_fd;
1544	pollfds[0].events = POLLIN;
1545
1546	while (1) {
1547		struct timespec *wait_ts;
1548		struct iodev_callback_list *iodev_cb;
1549
1550		wait_ts = NULL;
1551		num_pollfds = 1;
1552
1553		/* device opened */
1554		rc = stream_dev_io(thread);
1555		if (rc < 0)
1556			syslog(LOG_ERR, "audio cb error %d", rc);
1557
1558		if (fill_next_sleep_interval(thread, &ts))
1559			wait_ts = &ts;
1560
1561restart_poll_loop:
1562		num_pollfds = 1;
1563
1564		DL_FOREACH(iodev_callbacks, iodev_cb) {
1565			if (!iodev_cb->enabled)
1566				continue;
1567			pollfds[num_pollfds].fd = iodev_cb->fd;
1568			iodev_cb->pollfd = &pollfds[num_pollfds];
1569			if (iodev_cb->is_write)
1570				pollfds[num_pollfds].events = POLLOUT;
1571			else
1572				pollfds[num_pollfds].events = POLLIN;
1573			num_pollfds++;
1574			if (num_pollfds >= pollfds_size) {
1575				pollfds_size *= 2;
1576				pollfds = (struct pollfd *)realloc(pollfds,
1577					sizeof(*pollfds) * pollfds_size);
1578				goto restart_poll_loop;
1579			}
1580		}
1581
1582		/* TODO(dgreid) - once per rstream not per dev_stream */
1583		DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) {
1584			DL_FOREACH(adev->dev->streams, curr) {
1585				int fd = dev_stream_poll_stream_fd(curr);
1586				if (fd < 0)
1587					continue;
1588				pollfds[num_pollfds].fd = fd;
1589				pollfds[num_pollfds].events = POLLIN;
1590				num_pollfds++;
1591				if (num_pollfds >= pollfds_size) {
1592					pollfds_size *= 2;
1593					pollfds = (struct pollfd *)realloc(
1594							pollfds,
1595							sizeof(*pollfds) *
1596								pollfds_size);
1597					goto restart_poll_loop;
1598				}
1599			}
1600		}
1601
1602		if (last_wake.tv_sec) {
1603			struct timespec this_wake;
1604			clock_gettime(CLOCK_MONOTONIC_RAW, &now);
1605			subtract_timespecs(&now, &last_wake, &this_wake);
1606			if (timespec_after(&this_wake, &longest_wake))
1607				longest_wake = this_wake;
1608		}
1609
1610		ATLOG(atlog, AUDIO_THREAD_SLEEP,
1611					    wait_ts ? wait_ts->tv_sec : 0,
1612					    wait_ts ? wait_ts->tv_nsec : 0,
1613					    longest_wake.tv_nsec);
1614		rc = ppoll(pollfds, num_pollfds, wait_ts, NULL);
1615		clock_gettime(CLOCK_MONOTONIC_RAW, &last_wake);
1616		ATLOG(atlog, AUDIO_THREAD_WAKE, rc, 0, 0);
1617		if (rc <= 0)
1618			continue;
1619
1620		if (pollfds[0].revents & POLLIN) {
1621			rc = handle_playback_thread_message(thread);
1622			if (rc < 0)
1623				syslog(LOG_INFO, "handle message %d", rc);
1624		}
1625
1626		DL_FOREACH(iodev_callbacks, iodev_cb) {
1627			if (iodev_cb->pollfd &&
1628			    iodev_cb->pollfd->revents & (POLLIN | POLLOUT)) {
1629				ATLOG(
1630					atlog, AUDIO_THREAD_IODEV_CB,
1631					iodev_cb->is_write, 0, 0);
1632				iodev_cb->cb(iodev_cb->cb_data);
1633			}
1634		}
1635	}
1636
1637	return NULL;
1638}
1639
1640/* Write a message to the playback thread and wait for an ack, This keeps these
1641 * operations synchronous for the main server thread.  For instance when the
1642 * RM_STREAM message is sent, the stream can be deleted after the function
1643 * returns.  Making this synchronous also allows the thread to return an error
1644 * code that can be handled by the caller.
1645 * Args:
1646 *    thread - thread to receive message.
1647 *    msg - The message to send.
1648 * Returns:
1649 *    A return code from the message handler in the thread.
1650 */
1651static int audio_thread_post_message(struct audio_thread *thread,
1652				     struct audio_thread_msg *msg)
1653{
1654	int err;
1655	void *rsp;
1656
1657	err = write(thread->to_thread_fds[1], msg, msg->length);
1658	if (err < 0) {
1659		syslog(LOG_ERR, "Failed to post message to thread.");
1660		return err;
1661	}
1662	/* Synchronous action, wait for response. */
1663	err = read(thread->to_main_fds[0], &rsp, sizeof(rsp));
1664	if (err < 0) {
1665		syslog(LOG_ERR, "Failed to read reply from thread.");
1666		return err;
1667	}
1668
1669	return (intptr_t)rsp;
1670}
1671
1672static void init_open_device_msg(struct audio_thread_open_device_msg *msg,
1673				 enum AUDIO_THREAD_COMMAND id,
1674				 struct cras_iodev *dev)
1675{
1676	memset(msg, 0, sizeof(*msg));
1677	msg->header.id = id;
1678	msg->header.length = sizeof(*msg);
1679	msg->dev = dev;
1680}
1681
1682static void init_add_rm_stream_msg(struct audio_thread_add_rm_stream_msg *msg,
1683				   enum AUDIO_THREAD_COMMAND id,
1684				   struct cras_rstream *stream,
1685				   struct cras_iodev **devs,
1686				   unsigned int num_devs)
1687{
1688	memset(msg, 0, sizeof(*msg));
1689	msg->header.id = id;
1690	msg->header.length = sizeof(*msg);
1691	msg->stream = stream;
1692	msg->devs = devs;
1693	msg->num_devs = num_devs;
1694}
1695
1696static void init_dump_debug_info_msg(
1697		struct audio_thread_dump_debug_info_msg *msg,
1698		struct audio_debug_info *info)
1699{
1700	memset(msg, 0, sizeof(*msg));
1701	msg->header.id = AUDIO_THREAD_DUMP_THREAD_INFO;
1702	msg->header.length = sizeof(*msg);
1703	msg->info = info;
1704}
1705
1706static void init_config_global_remix_msg(
1707		struct audio_thread_config_global_remix *msg)
1708{
1709	memset(msg, 0, sizeof(*msg));
1710	msg->header.id = AUDIO_THREAD_CONFIG_GLOBAL_REMIX;
1711	msg->header.length = sizeof(*msg);
1712}
1713
1714static void init_device_start_ramp_msg(
1715		struct audio_thread_dev_start_ramp_msg *msg,
1716		enum AUDIO_THREAD_COMMAND id,
1717		struct cras_iodev *dev,
1718		enum CRAS_IODEV_RAMP_REQUEST request)
1719{
1720	memset(msg, 0, sizeof(*msg));
1721	msg->header.id = id;
1722	msg->header.length = sizeof(*msg);
1723	msg->dev = dev;
1724	msg->request = request;
1725}
1726
1727/* Exported Interface */
1728
1729int audio_thread_add_stream(struct audio_thread *thread,
1730			    struct cras_rstream *stream,
1731			    struct cras_iodev **devs,
1732			    unsigned int num_devs)
1733{
1734	struct audio_thread_add_rm_stream_msg msg;
1735
1736	assert(thread && stream);
1737
1738	if (!thread->started)
1739		return -EINVAL;
1740
1741	init_add_rm_stream_msg(&msg, AUDIO_THREAD_ADD_STREAM, stream,
1742			       devs, num_devs);
1743	return audio_thread_post_message(thread, &msg.header);
1744}
1745
1746int audio_thread_disconnect_stream(struct audio_thread *thread,
1747				   struct cras_rstream *stream,
1748				   struct cras_iodev *dev)
1749{
1750	struct audio_thread_add_rm_stream_msg msg;
1751
1752	assert(thread && stream);
1753
1754	init_add_rm_stream_msg(&msg, AUDIO_THREAD_DISCONNECT_STREAM, stream,
1755			       &dev, 0);
1756	return audio_thread_post_message(thread, &msg.header);
1757}
1758
1759int audio_thread_drain_stream(struct audio_thread *thread,
1760			      struct cras_rstream *stream)
1761{
1762	struct audio_thread_add_rm_stream_msg msg;
1763
1764	assert(thread && stream);
1765
1766	init_add_rm_stream_msg(&msg, AUDIO_THREAD_DRAIN_STREAM, stream,
1767			       NULL, 0);
1768	return audio_thread_post_message(thread, &msg.header);
1769}
1770
1771int audio_thread_dump_thread_info(struct audio_thread *thread,
1772				  struct audio_debug_info *info)
1773{
1774	struct audio_thread_dump_debug_info_msg msg;
1775
1776	init_dump_debug_info_msg(&msg, info);
1777	return audio_thread_post_message(thread, &msg.header);
1778}
1779
1780int audio_thread_rm_callback_sync(struct audio_thread *thread, int fd) {
1781	struct audio_thread_rm_callback_msg msg;
1782
1783	memset(&msg, 0, sizeof(msg));
1784	msg.header.id = AUDIO_THREAD_REMOVE_CALLBACK;
1785	msg.header.length = sizeof(msg);
1786	msg.fd = fd;
1787
1788	return audio_thread_post_message(thread, &msg.header);
1789}
1790
1791int audio_thread_config_global_remix(struct audio_thread *thread,
1792				     unsigned int num_channels,
1793				     const float *coefficient)
1794{
1795	int err;
1796	int identity_remix = 1;
1797	unsigned int i, j;
1798	struct audio_thread_config_global_remix msg;
1799	void *rsp;
1800
1801	init_config_global_remix_msg(&msg);
1802
1803	/* Check if the coefficients represent an identity matrix for remix
1804	 * conversion, which means no remix at all. If so then leave the
1805	 * converter as NULL. */
1806	for (i = 0; i < num_channels; i++) {
1807		if (coefficient[i * num_channels + i] != 1.0f) {
1808			identity_remix = 0;
1809			break;
1810		}
1811		for (j = i + 1; j < num_channels; j++) {
1812			if (coefficient[i * num_channels + j] != 0 ||
1813			    coefficient[j * num_channels + i] != 0)
1814				identity_remix = 0;
1815				break;
1816		}
1817	}
1818
1819	if (!identity_remix) {
1820		msg.fmt_conv = cras_channel_remix_conv_create(num_channels,
1821							      coefficient);
1822		if (NULL == msg.fmt_conv)
1823			return -ENOMEM;
1824	}
1825
1826	err = write(thread->to_thread_fds[1], &msg, msg.header.length);
1827	if (err < 0) {
1828		syslog(LOG_ERR, "Failed to post message to thread.");
1829		return err;
1830	}
1831	/* Synchronous action, wait for response. */
1832	err = read(thread->to_main_fds[0], &rsp, sizeof(rsp));
1833	if (err < 0) {
1834		syslog(LOG_ERR, "Failed to read reply from thread.");
1835		return err;
1836	}
1837
1838	if (rsp)
1839		cras_fmt_conv_destroy((struct cras_fmt_conv *)rsp);
1840	return 0;
1841}
1842
1843struct cras_fmt_conv *audio_thread_get_global_remix_converter()
1844{
1845	return remix_converter;
1846}
1847
1848struct audio_thread *audio_thread_create()
1849{
1850	int rc;
1851	struct audio_thread *thread;
1852
1853	thread = (struct audio_thread *)calloc(1, sizeof(*thread));
1854	if (!thread)
1855		return NULL;
1856
1857	thread->to_thread_fds[0] = -1;
1858	thread->to_thread_fds[1] = -1;
1859	thread->to_main_fds[0] = -1;
1860	thread->to_main_fds[1] = -1;
1861
1862	/* Two way pipes for communication with the device's audio thread. */
1863	rc = pipe(thread->to_thread_fds);
1864	if (rc < 0) {
1865		syslog(LOG_ERR, "Failed to pipe");
1866		free(thread);
1867		return NULL;
1868	}
1869	rc = pipe(thread->to_main_fds);
1870	if (rc < 0) {
1871		syslog(LOG_ERR, "Failed to pipe");
1872		free(thread);
1873		return NULL;
1874	}
1875
1876	atlog = audio_thread_event_log_init();
1877
1878	return thread;
1879}
1880
1881int audio_thread_add_open_dev(struct audio_thread *thread,
1882				struct cras_iodev *dev)
1883{
1884	struct audio_thread_open_device_msg msg;
1885
1886	assert(thread && dev);
1887
1888	if (!thread->started)
1889		return -EINVAL;
1890
1891	init_open_device_msg(&msg, AUDIO_THREAD_ADD_OPEN_DEV, dev);
1892	return audio_thread_post_message(thread, &msg.header);
1893}
1894
1895int audio_thread_rm_open_dev(struct audio_thread *thread,
1896			     struct cras_iodev *dev)
1897{
1898	struct audio_thread_open_device_msg msg;
1899
1900	assert(thread && dev);
1901	if (!thread->started)
1902		return -EINVAL;
1903
1904	init_open_device_msg(&msg, AUDIO_THREAD_RM_OPEN_DEV, dev);
1905	return audio_thread_post_message(thread, &msg.header);
1906}
1907
1908int audio_thread_dev_start_ramp(struct audio_thread *thread,
1909				struct cras_iodev *dev,
1910				enum CRAS_IODEV_RAMP_REQUEST request)
1911{
1912	struct audio_thread_dev_start_ramp_msg msg;
1913
1914	assert(thread && dev);
1915
1916	if (!thread->started)
1917		return -EINVAL;
1918
1919	init_device_start_ramp_msg(&msg, AUDIO_THREAD_DEV_START_RAMP,
1920				   dev, request);
1921	return audio_thread_post_message(thread, &msg.header);
1922}
1923
1924int audio_thread_start(struct audio_thread *thread)
1925{
1926	int rc;
1927
1928	rc = pthread_create(&thread->tid, NULL, audio_io_thread, thread);
1929	if (rc) {
1930		syslog(LOG_ERR, "Failed pthread_create");
1931		return rc;
1932	}
1933
1934	thread->started = 1;
1935
1936	return 0;
1937}
1938
1939void audio_thread_destroy(struct audio_thread *thread)
1940{
1941	audio_thread_event_log_deinit(atlog);
1942
1943	if (thread->started) {
1944		struct audio_thread_msg msg;
1945
1946		msg.id = AUDIO_THREAD_STOP;
1947		msg.length = sizeof(msg);
1948		audio_thread_post_message(thread, &msg);
1949		pthread_join(thread->tid, NULL);
1950	}
1951
1952	if (thread->to_thread_fds[0] != -1) {
1953		close(thread->to_thread_fds[0]);
1954		close(thread->to_thread_fds[1]);
1955	}
1956	if (thread->to_main_fds[0] != -1) {
1957		close(thread->to_main_fds[0]);
1958		close(thread->to_main_fds[1]);
1959	}
1960
1961	if (remix_converter)
1962		cras_fmt_conv_destroy(remix_converter);
1963
1964	free(thread);
1965}
1966