1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "common_time"
18#include <utils/Log.h>
19
20#include <fcntl.h>
21#include <linux/in.h>
22#include <linux/tcp.h>
23#include <poll.h>
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <unistd.h>
27#include <utils/Errors.h>
28#include <utils/misc.h>
29
30#include <common_time/local_clock.h>
31
32#include "common_clock.h"
33#include "diag_thread.h"
34
35#define kMaxEvents 16
36#define kListenPort 9876
37
38static bool setNonblocking(int fd) {
39    int flags = fcntl(fd, F_GETFL);
40    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
41        ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)",
42             fd, errno);
43        return false;
44    }
45
46    return true;
47}
48
49static bool setNodelay(int fd) {
50    int tmp = 1;
51    if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &tmp, sizeof(tmp)) < 0) {
52        ALOGE("Failed to set socket (%d) to no-delay mode (errno %d)",
53             fd, errno);
54        return false;
55    }
56
57    return true;
58}
59
60namespace android {
61
62DiagThread::DiagThread(CommonClock* common_clock, LocalClock* local_clock) {
63    common_clock_ = common_clock;
64    local_clock_ = local_clock;
65    listen_fd_ = -1;
66    data_fd_ = -1;
67    kernel_logID_basis_known_ = false;
68    discipline_log_ID_ = 0;
69}
70
71DiagThread::~DiagThread() {
72}
73
74status_t DiagThread::startWorkThread() {
75    status_t res;
76    stopWorkThread();
77    res = run("Diag");
78
79    if (res != OK)
80        ALOGE("Failed to start work thread (res = %d)", res);
81
82    return res;
83}
84
85void DiagThread::stopWorkThread() {
86    status_t res;
87    res = requestExitAndWait(); // block until thread exit.
88    if (res != OK)
89        ALOGE("Failed to stop work thread (res = %d)", res);
90}
91
92bool DiagThread::openListenSocket() {
93    bool ret = false;
94    int flags;
95    cleanupListenSocket();
96
97    if ((listen_fd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
98        ALOGE("Socket failed.");
99        goto bailout;
100    }
101
102    // Set non-blocking operation
103    if (!setNonblocking(listen_fd_))
104        goto bailout;
105
106    struct sockaddr_in addr;
107    memset(&addr, 0, sizeof(addr));
108    addr.sin_family = AF_INET;
109    addr.sin_addr.s_addr = INADDR_ANY;
110    addr.sin_port = htons(kListenPort);
111
112    if (bind(listen_fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
113        ALOGE("Bind failed.");
114        goto bailout;
115    }
116
117    if (listen(listen_fd_, 1) < 0) {
118        ALOGE("Listen failed.");
119        goto bailout;
120    }
121
122    ret = true;
123bailout:
124    if (!ret)
125        cleanupListenSocket();
126
127    return ret;
128}
129
130void DiagThread::cleanupListenSocket() {
131    if (listen_fd_ >= 0) {
132        int res;
133
134        struct linger l;
135        l.l_onoff  = 1;
136        l.l_linger = 0;
137
138        setsockopt(listen_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
139        shutdown(listen_fd_, SHUT_RDWR);
140        close(listen_fd_);
141        listen_fd_ = -1;
142    }
143}
144
145void DiagThread::cleanupDataSocket() {
146    if (data_fd_ >= 0) {
147        int res;
148
149        struct linger l;
150        l.l_onoff  = 1;
151        l.l_linger = 0;
152
153        setsockopt(data_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
154        shutdown(data_fd_, SHUT_RDWR);
155        close(data_fd_);
156        data_fd_ = -1;
157    }
158}
159
160void DiagThread::resetLogIDs() {
161    // Drain and discard all of the events from the kernel
162    struct local_time_debug_event events[kMaxEvents];
163    while(local_clock_->getDebugLog(events, kMaxEvents) > 0)
164        ;
165
166    {
167        Mutex::Autolock lock(&discipline_log_lock_);
168        discipline_log_.clear();
169        discipline_log_ID_ = 0;
170    }
171
172    kernel_logID_basis_known_ = false;
173}
174
175void DiagThread::pushDisciplineEvent(int64_t observed_local_time,
176                                     int64_t observed_common_time,
177                                     int64_t nominal_common_time,
178                                     int32_t total_correction,
179                                     int32_t rtt) {
180    Mutex::Autolock lock(&discipline_log_lock_);
181
182    DisciplineEventRecord evt;
183
184    evt.event_id = discipline_log_ID_++;
185
186    evt.action_local_time = local_clock_->getLocalTime();
187    common_clock_->localToCommon(evt.action_local_time,
188            &evt.action_common_time);
189
190    evt.observed_local_time  = observed_local_time;
191    evt.observed_common_time = observed_common_time;
192    evt.nominal_common_time  = nominal_common_time;
193    evt.total_correction     = total_correction;
194    evt.rtt                  = rtt;
195
196    discipline_log_.push_back(evt);
197    while (discipline_log_.size() > kMaxDisciplineLogSize)
198        discipline_log_.erase(discipline_log_.begin());
199}
200
201bool DiagThread::threadLoop() {
202    struct pollfd poll_fds[1];
203
204    if (!openListenSocket()) {
205        ALOGE("Failed to open listen socket");
206        goto bailout;
207    }
208
209    while (!exitPending()) {
210        memset(&poll_fds, 0, sizeof(poll_fds));
211
212        if (data_fd_ < 0) {
213            poll_fds[0].fd     = listen_fd_;
214            poll_fds[0].events = POLLIN;
215        } else {
216            poll_fds[0].fd     = data_fd_;
217            poll_fds[0].events = POLLRDHUP | POLLIN;
218        }
219
220        int poll_res = poll(poll_fds, NELEM(poll_fds), 50);
221        if (poll_res < 0) {
222            ALOGE("Fatal error (%d,%d) while waiting on events",
223                 poll_res, errno);
224            goto bailout;
225        }
226
227        if (exitPending())
228            break;
229
230        if (poll_fds[0].revents) {
231            if (poll_fds[0].fd == listen_fd_) {
232                data_fd_ = accept(listen_fd_, NULL, NULL);
233
234                if (data_fd_ < 0) {
235                    ALOGW("Failed accept on socket %d with err %d",
236                         listen_fd_, errno);
237                } else {
238                    if (!setNonblocking(data_fd_))
239                        cleanupDataSocket();
240                    if (!setNodelay(data_fd_))
241                        cleanupDataSocket();
242                }
243            } else
244                if (poll_fds[0].fd == data_fd_) {
245                    if (poll_fds[0].revents & POLLRDHUP) {
246                        // Connection hung up; time to clean up.
247                        cleanupDataSocket();
248                    } else
249                        if (poll_fds[0].revents & POLLIN) {
250                            uint8_t cmd;
251                            if (read(data_fd_, &cmd, sizeof(cmd)) > 0) {
252                                switch(cmd) {
253                                    case 'r':
254                                    case 'R':
255                                        resetLogIDs();
256                                        break;
257                                }
258                            }
259                        }
260                }
261        }
262
263        struct local_time_debug_event events[kMaxEvents];
264        int amt = local_clock_->getDebugLog(events, kMaxEvents);
265
266        if (amt > 0) {
267            for (int i = 0; i < amt; i++) {
268                struct local_time_debug_event& e = events[i];
269
270                if (!kernel_logID_basis_known_) {
271                    kernel_logID_basis_ = e.local_timesync_event_id;
272                    kernel_logID_basis_known_ = true;
273                }
274
275                char buf[1024];
276                int64_t common_time;
277                status_t res = common_clock_->localToCommon(e.local_time,
278                                                            &common_time);
279                snprintf(buf, sizeof(buf), "E,%lld,%lld,%lld,%d\n",
280                         e.local_timesync_event_id - kernel_logID_basis_,
281                         e.local_time,
282                         common_time,
283                         (OK == res) ? 1 : 0);
284                buf[sizeof(buf) - 1] = 0;
285
286                if (data_fd_ >= 0)
287                    write(data_fd_, buf, strlen(buf));
288            }
289        }
290
291        { // scope for autolock pattern
292            Mutex::Autolock lock(&discipline_log_lock_);
293
294            while (discipline_log_.size() > 0) {
295                char buf[1024];
296                DisciplineEventRecord& e = *discipline_log_.begin();
297                snprintf(buf, sizeof(buf),
298                         "D,%lld,%lld,%lld,%lld,%lld,%lld,%d,%d\n",
299                         e.event_id,
300                         e.action_local_time,
301                         e.action_common_time,
302                         e.observed_local_time,
303                         e.observed_common_time,
304                         e.nominal_common_time,
305                         e.total_correction,
306                         e.rtt);
307                buf[sizeof(buf) - 1] = 0;
308
309                if (data_fd_ >= 0)
310                    write(data_fd_, buf, strlen(buf));
311
312                discipline_log_.erase(discipline_log_.begin());
313            }
314        }
315    }
316
317bailout:
318    cleanupDataSocket();
319    cleanupListenSocket();
320    return false;
321}
322
323}  // namespace android
324