ProgramDataManager.java revision 0cc0713c1bf8027642987b750b80217569d2932a
1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.tv.data;
18
19import android.content.ContentResolver;
20import android.content.Context;
21import android.database.ContentObserver;
22import android.database.Cursor;
23import android.media.tv.TvContract;
24import android.media.tv.TvContract.Programs;
25import android.net.Uri;
26import android.os.Handler;
27import android.os.Looper;
28import android.os.Message;
29import android.support.annotation.AnyThread;
30import android.support.annotation.MainThread;
31import android.support.annotation.VisibleForTesting;
32import android.util.ArraySet;
33import android.util.Log;
34import android.util.LongSparseArray;
35import android.util.LruCache;
36import com.android.tv.TvSingletons;
37import com.android.tv.common.SoftPreconditions;
38import com.android.tv.common.config.api.RemoteConfig;
39import com.android.tv.common.config.api.RemoteConfigValue;
40import com.android.tv.common.memory.MemoryManageable;
41import com.android.tv.common.util.Clock;
42import com.android.tv.data.api.Channel;
43import com.android.tv.util.AsyncDbTask;
44import com.android.tv.util.MultiLongSparseArray;
45import com.android.tv.util.Utils;
46import java.util.ArrayList;
47import java.util.Collections;
48import java.util.HashMap;
49import java.util.HashSet;
50import java.util.List;
51import java.util.ListIterator;
52import java.util.Map;
53import java.util.Objects;
54import java.util.Set;
55import java.util.concurrent.ConcurrentHashMap;
56import java.util.concurrent.Executor;
57import java.util.concurrent.TimeUnit;
58
59@MainThread
60public class ProgramDataManager implements MemoryManageable {
61    private static final String TAG = "ProgramDataManager";
62    private static final boolean DEBUG = false;
63
64    // To prevent from too many program update operations at the same time, we give random interval
65    // between PERIODIC_PROGRAM_UPDATE_MIN_MS and PERIODIC_PROGRAM_UPDATE_MAX_MS.
66    @VisibleForTesting
67    static final long PERIODIC_PROGRAM_UPDATE_MIN_MS = TimeUnit.MINUTES.toMillis(5);
68
69    private static final long PERIODIC_PROGRAM_UPDATE_MAX_MS = TimeUnit.MINUTES.toMillis(10);
70    private static final long PROGRAM_PREFETCH_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
71    // TODO: need to optimize consecutive DB updates.
72    private static final long CURRENT_PROGRAM_UPDATE_WAIT_MS = TimeUnit.SECONDS.toMillis(5);
73    @VisibleForTesting static final long PROGRAM_GUIDE_SNAP_TIME_MS = TimeUnit.MINUTES.toMillis(30);
74    private static final RemoteConfigValue<Long> PROGRAM_GUIDE_MAX_HOURS =
75            RemoteConfigValue.create("live_channels_program_guide_max_hours", 48);
76
77    // TODO: Use TvContract constants, once they become public.
78    private static final String PARAM_START_TIME = "start_time";
79    private static final String PARAM_END_TIME = "end_time";
80    // COLUMN_CHANNEL_ID, COLUMN_END_TIME_UTC_MILLIS are added to detect duplicated programs.
81    // Duplicated programs are always consecutive by the sorting order.
82    private static final String SORT_BY_TIME =
83            Programs.COLUMN_START_TIME_UTC_MILLIS
84                    + ", "
85                    + Programs.COLUMN_CHANNEL_ID
86                    + ", "
87                    + Programs.COLUMN_END_TIME_UTC_MILLIS;
88
89    private static final int MSG_UPDATE_CURRENT_PROGRAMS = 1000;
90    private static final int MSG_UPDATE_ONE_CURRENT_PROGRAM = 1001;
91    private static final int MSG_UPDATE_PREFETCH_PROGRAM = 1002;
92
93    private final Clock mClock;
94    private final ContentResolver mContentResolver;
95    private final Executor mDbExecutor;
96    private final RemoteConfig mRemoteConfig;
97    private boolean mStarted;
98    // Updated only on the main thread.
99    private volatile boolean mCurrentProgramsLoadFinished;
100    private ProgramsUpdateTask mProgramsUpdateTask;
101    private final LongSparseArray<UpdateCurrentProgramForChannelTask> mProgramUpdateTaskMap =
102            new LongSparseArray<>();
103    private final Map<Long, Program> mChannelIdCurrentProgramMap = new ConcurrentHashMap<>();
104    private final MultiLongSparseArray<OnCurrentProgramUpdatedListener>
105            mChannelId2ProgramUpdatedListeners = new MultiLongSparseArray<>();
106    private final Handler mHandler;
107    private final Set<Listener> mListeners = new ArraySet<>();
108
109    private final ContentObserver mProgramObserver;
110
111    private boolean mPrefetchEnabled;
112    private long mProgramPrefetchUpdateWaitMs;
113    private long mLastPrefetchTaskRunMs;
114    private ProgramsPrefetchTask mProgramsPrefetchTask;
115    private Map<Long, ArrayList<Program>> mChannelIdProgramCache = new HashMap<>();
116
117    // Any program that ends prior to this time will be removed from the cache
118    // when a channel's current program is updated.
119    // Note that there's no limit for end time.
120    private long mPrefetchTimeRangeStartMs;
121
122    private boolean mPauseProgramUpdate = false;
123    private final LruCache<Long, Program> mZeroLengthProgramCache = new LruCache<>(10);
124
125    @MainThread
126    public ProgramDataManager(Context context) {
127        this(
128                TvSingletons.getSingletons(context).getDbExecutor(),
129                context.getContentResolver(),
130                Clock.SYSTEM,
131                Looper.myLooper(),
132                TvSingletons.getSingletons(context).getRemoteConfig());
133    }
134
135    @VisibleForTesting
136    ProgramDataManager(
137            Executor executor,
138            ContentResolver contentResolver,
139            Clock time,
140            Looper looper,
141            RemoteConfig remoteConfig) {
142        mDbExecutor = executor;
143        mClock = time;
144        mContentResolver = contentResolver;
145        mHandler = new MyHandler(looper);
146        mRemoteConfig = remoteConfig;
147        mProgramObserver =
148                new ContentObserver(mHandler) {
149                    @Override
150                    public void onChange(boolean selfChange) {
151                        if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
152                            mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
153                        }
154                        if (isProgramUpdatePaused()) {
155                            return;
156                        }
157                        if (mPrefetchEnabled) {
158                            // The delay time of an existing MSG_UPDATE_PREFETCH_PROGRAM could be
159                            // quite long
160                            // up to PROGRAM_GUIDE_SNAP_TIME_MS. So we need to remove the existing
161                            // message
162                            // and send MSG_UPDATE_PREFETCH_PROGRAM again.
163                            mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
164                            mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
165                        }
166                    }
167                };
168        mProgramPrefetchUpdateWaitMs = PROGRAM_PREFETCH_UPDATE_WAIT_MS;
169    }
170
171    @VisibleForTesting
172    ContentObserver getContentObserver() {
173        return mProgramObserver;
174    }
175
176    /**
177     * Set the program prefetch update wait which gives the delay to query all programs from DB to
178     * prevent from too frequent DB queries. Default value is {@link
179     * #PROGRAM_PREFETCH_UPDATE_WAIT_MS}
180     */
181    @VisibleForTesting
182    void setProgramPrefetchUpdateWait(long programPrefetchUpdateWaitMs) {
183        mProgramPrefetchUpdateWaitMs = programPrefetchUpdateWaitMs;
184    }
185
186    /** Starts the manager. */
187    public void start() {
188        if (mStarted) {
189            return;
190        }
191        mStarted = true;
192        // Should be called directly instead of posting MSG_UPDATE_CURRENT_PROGRAMS message
193        // to the handler. If not, another DB task can be executed before loading current programs.
194        handleUpdateCurrentPrograms();
195        if (mPrefetchEnabled) {
196            mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
197        }
198        mContentResolver.registerContentObserver(Programs.CONTENT_URI, true, mProgramObserver);
199    }
200
201    /**
202     * Stops the manager. It clears manager states and runs pending DB operations. Added listeners
203     * aren't automatically removed by this method.
204     */
205    @VisibleForTesting
206    public void stop() {
207        if (!mStarted) {
208            return;
209        }
210        mStarted = false;
211        mContentResolver.unregisterContentObserver(mProgramObserver);
212        mHandler.removeCallbacksAndMessages(null);
213
214        clearTask(mProgramUpdateTaskMap);
215        cancelPrefetchTask();
216        if (mProgramsUpdateTask != null) {
217            mProgramsUpdateTask.cancel(true);
218            mProgramsUpdateTask = null;
219        }
220    }
221
222    @AnyThread
223    public boolean isCurrentProgramsLoadFinished() {
224        return mCurrentProgramsLoadFinished;
225    }
226
227    /** Returns the current program at the specified channel. */
228    @AnyThread
229    public Program getCurrentProgram(long channelId) {
230        return mChannelIdCurrentProgramMap.get(channelId);
231    }
232
233    /** Returns all the current programs. */
234    @AnyThread
235    public List<Program> getCurrentPrograms() {
236        return new ArrayList<>(mChannelIdCurrentProgramMap.values());
237    }
238
239    /** Reloads program data. */
240    public void reload() {
241        if (!mHandler.hasMessages(MSG_UPDATE_CURRENT_PROGRAMS)) {
242            mHandler.sendEmptyMessage(MSG_UPDATE_CURRENT_PROGRAMS);
243        }
244        if (mPrefetchEnabled && !mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
245            mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
246        }
247    }
248
249    /** A listener interface to receive notification on program data retrieval from DB. */
250    public interface Listener {
251        /**
252         * Called when a Program data is now available through getProgram() after the DB operation
253         * is done which wasn't before. This would be called only if fetched data is around the
254         * selected program.
255         */
256        void onProgramUpdated();
257    }
258
259    /** Adds the {@link Listener}. */
260    public void addListener(Listener listener) {
261        mListeners.add(listener);
262    }
263
264    /** Removes the {@link Listener}. */
265    public void removeListener(Listener listener) {
266        mListeners.remove(listener);
267    }
268
269    /** Enables or Disables program prefetch. */
270    public void setPrefetchEnabled(boolean enable) {
271        if (mPrefetchEnabled == enable) {
272            return;
273        }
274        if (enable) {
275            mPrefetchEnabled = true;
276            mLastPrefetchTaskRunMs = 0;
277            if (mStarted) {
278                mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
279            }
280        } else {
281            mPrefetchEnabled = false;
282            cancelPrefetchTask();
283            mChannelIdProgramCache.clear();
284            mHandler.removeMessages(MSG_UPDATE_PREFETCH_PROGRAM);
285        }
286    }
287
288    /**
289     * Returns the programs for the given channel which ends after the given start time.
290     *
291     * <p>Prefetch should be enabled to call it.
292     *
293     * @return {@link List} with Programs. It may includes dummy program if the entry needs DB
294     *     operations to get.
295     */
296    public List<Program> getPrograms(long channelId, long startTime) {
297        SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
298        ArrayList<Program> cachedPrograms = mChannelIdProgramCache.get(channelId);
299        if (cachedPrograms == null) {
300            return Collections.emptyList();
301        }
302        int startIndex = getProgramIndexAt(cachedPrograms, startTime);
303        return Collections.unmodifiableList(
304                cachedPrograms.subList(startIndex, cachedPrograms.size()));
305    }
306
307    /**
308     * Returns the index of program that is played at the specified time.
309     *
310     * <p>If there isn't, return the first program among programs that starts after the given time
311     * if returnNextProgram is {@code true}.
312     */
313    private int getProgramIndexAt(List<Program> programs, long time) {
314        Program key = mZeroLengthProgramCache.get(time);
315        if (key == null) {
316            key = createDummyProgram(time, time);
317            mZeroLengthProgramCache.put(time, key);
318        }
319        int index = Collections.binarySearch(programs, key);
320        if (index < 0) {
321            index = -(index + 1); // change it to index to be added.
322            if (index > 0 && isProgramPlayedAt(programs.get(index - 1), time)) {
323                // A program is played at that time.
324                return index - 1;
325            }
326            return index;
327        }
328        return index;
329    }
330
331    private boolean isProgramPlayedAt(Program program, long time) {
332        return program.getStartTimeUtcMillis() <= time && time <= program.getEndTimeUtcMillis();
333    }
334
335    /**
336     * Adds the listener to be notified if current program is updated for a channel.
337     *
338     * @param channelId A channel ID to get notified. If it's {@link Channel#INVALID_ID}, the
339     *     listener would be called whenever a current program is updated.
340     */
341    public void addOnCurrentProgramUpdatedListener(
342            long channelId, OnCurrentProgramUpdatedListener listener) {
343        mChannelId2ProgramUpdatedListeners.put(channelId, listener);
344    }
345
346    /**
347     * Removes the listener previously added by {@link #addOnCurrentProgramUpdatedListener(long,
348     * OnCurrentProgramUpdatedListener)}.
349     */
350    public void removeOnCurrentProgramUpdatedListener(
351            long channelId, OnCurrentProgramUpdatedListener listener) {
352        mChannelId2ProgramUpdatedListeners.remove(channelId, listener);
353    }
354
355    private void notifyCurrentProgramUpdate(long channelId, Program program) {
356        for (OnCurrentProgramUpdatedListener listener :
357                mChannelId2ProgramUpdatedListeners.get(channelId)) {
358            listener.onCurrentProgramUpdated(channelId, program);
359        }
360        for (OnCurrentProgramUpdatedListener listener :
361                mChannelId2ProgramUpdatedListeners.get(Channel.INVALID_ID)) {
362            listener.onCurrentProgramUpdated(channelId, program);
363        }
364    }
365
366    private void updateCurrentProgram(long channelId, Program program) {
367        Program previousProgram =
368                program == null
369                        ? mChannelIdCurrentProgramMap.remove(channelId)
370                        : mChannelIdCurrentProgramMap.put(channelId, program);
371        if (!Objects.equals(program, previousProgram)) {
372            if (mPrefetchEnabled) {
373                removePreviousProgramsAndUpdateCurrentProgramInCache(channelId, program);
374            }
375            notifyCurrentProgramUpdate(channelId, program);
376        }
377
378        long delayedTime;
379        if (program == null) {
380            delayedTime =
381                    PERIODIC_PROGRAM_UPDATE_MIN_MS
382                            + (long)
383                                    (Math.random()
384                                            * (PERIODIC_PROGRAM_UPDATE_MAX_MS
385                                                    - PERIODIC_PROGRAM_UPDATE_MIN_MS));
386        } else {
387            delayedTime = program.getEndTimeUtcMillis() - mClock.currentTimeMillis();
388        }
389        mHandler.sendMessageDelayed(
390                mHandler.obtainMessage(MSG_UPDATE_ONE_CURRENT_PROGRAM, channelId), delayedTime);
391    }
392
393    private void removePreviousProgramsAndUpdateCurrentProgramInCache(
394            long channelId, Program currentProgram) {
395        SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
396        if (!Program.isProgramValid(currentProgram)) {
397            return;
398        }
399        ArrayList<Program> cachedPrograms = mChannelIdProgramCache.remove(channelId);
400        if (cachedPrograms == null) {
401            return;
402        }
403        ListIterator<Program> i = cachedPrograms.listIterator();
404        while (i.hasNext()) {
405            Program cachedProgram = i.next();
406            if (cachedProgram.getEndTimeUtcMillis() <= mPrefetchTimeRangeStartMs) {
407                // Remove previous programs which will not be shown in program guide.
408                i.remove();
409                continue;
410            }
411
412            if (cachedProgram.getEndTimeUtcMillis() <= currentProgram.getStartTimeUtcMillis()) {
413                // Keep the programs that ends earlier than current program
414                // but later than mPrefetchTimeRangeStartMs.
415                continue;
416            }
417
418            // Update dummy program around current program if any.
419            if (cachedProgram.getStartTimeUtcMillis() < currentProgram.getStartTimeUtcMillis()) {
420                // The dummy program starts earlier than the current program. Adjust its end time.
421                i.set(
422                        createDummyProgram(
423                                cachedProgram.getStartTimeUtcMillis(),
424                                currentProgram.getStartTimeUtcMillis()));
425                i.add(currentProgram);
426            } else {
427                i.set(currentProgram);
428            }
429            if (currentProgram.getEndTimeUtcMillis() < cachedProgram.getEndTimeUtcMillis()) {
430                // The dummy program ends later than the current program. Adjust its start time.
431                i.add(
432                        createDummyProgram(
433                                currentProgram.getEndTimeUtcMillis(),
434                                cachedProgram.getEndTimeUtcMillis()));
435            }
436            break;
437        }
438        if (cachedPrograms.isEmpty()) {
439            // If all the cached programs finish before mPrefetchTimeRangeStartMs, the
440            // currentProgram would not have a chance to be inserted to the cache.
441            cachedPrograms.add(currentProgram);
442        }
443        mChannelIdProgramCache.put(channelId, cachedPrograms);
444    }
445
446    private void handleUpdateCurrentPrograms() {
447        if (mProgramsUpdateTask != null) {
448            mHandler.sendEmptyMessageDelayed(
449                    MSG_UPDATE_CURRENT_PROGRAMS, CURRENT_PROGRAM_UPDATE_WAIT_MS);
450            return;
451        }
452        clearTask(mProgramUpdateTaskMap);
453        mHandler.removeMessages(MSG_UPDATE_ONE_CURRENT_PROGRAM);
454        mProgramsUpdateTask = new ProgramsUpdateTask(mContentResolver, mClock.currentTimeMillis());
455        mProgramsUpdateTask.executeOnDbThread();
456    }
457
458    private class ProgramsPrefetchTask
459            extends AsyncDbTask<Void, Void, Map<Long, ArrayList<Program>>> {
460        private final long mStartTimeMs;
461        private final long mEndTimeMs;
462
463        private boolean mSuccess;
464
465        public ProgramsPrefetchTask() {
466            super(mDbExecutor);
467            long time = mClock.currentTimeMillis();
468            mStartTimeMs =
469                    Utils.floorTime(time - PROGRAM_GUIDE_SNAP_TIME_MS, PROGRAM_GUIDE_SNAP_TIME_MS);
470            mEndTimeMs =
471                    mStartTimeMs
472                            + TimeUnit.HOURS.toMillis(PROGRAM_GUIDE_MAX_HOURS.get(mRemoteConfig));
473            mSuccess = false;
474        }
475
476        @Override
477        protected Map<Long, ArrayList<Program>> doInBackground(Void... params) {
478            Map<Long, ArrayList<Program>> programMap = new HashMap<>();
479            if (DEBUG) {
480                Log.d(
481                        TAG,
482                        "Starts programs prefetch. "
483                                + Utils.toTimeString(mStartTimeMs)
484                                + "-"
485                                + Utils.toTimeString(mEndTimeMs));
486            }
487            Uri uri =
488                    Programs.CONTENT_URI
489                            .buildUpon()
490                            .appendQueryParameter(PARAM_START_TIME, String.valueOf(mStartTimeMs))
491                            .appendQueryParameter(PARAM_END_TIME, String.valueOf(mEndTimeMs))
492                            .build();
493            final int RETRY_COUNT = 3;
494            Program lastReadProgram = null;
495            for (int retryCount = RETRY_COUNT; retryCount > 0; retryCount--) {
496                if (isProgramUpdatePaused()) {
497                    return null;
498                }
499                programMap.clear();
500                try (Cursor c =
501                        mContentResolver.query(uri, Program.PROJECTION, null, null, SORT_BY_TIME)) {
502                    if (c == null) {
503                        continue;
504                    }
505                    while (c.moveToNext()) {
506                        int duplicateCount = 0;
507                        if (isCancelled()) {
508                            if (DEBUG) {
509                                Log.d(TAG, "ProgramsPrefetchTask canceled.");
510                            }
511                            return null;
512                        }
513                        Program program = Program.fromCursor(c);
514                        if (Program.isDuplicate(program, lastReadProgram)) {
515                            duplicateCount++;
516                            continue;
517                        } else {
518                            lastReadProgram = program;
519                        }
520                        ArrayList<Program> programs = programMap.get(program.getChannelId());
521                        if (programs == null) {
522                            programs = new ArrayList<>();
523                            programMap.put(program.getChannelId(), programs);
524                        }
525                        programs.add(program);
526                        if (duplicateCount > 0) {
527                            Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
528                        }
529                    }
530                    mSuccess = true;
531                    break;
532                } catch (IllegalStateException e) {
533                    if (DEBUG) {
534                        Log.d(TAG, "Database is changed while querying. Will retry.");
535                    }
536                } catch (SecurityException e) {
537                    Log.d(TAG, "Security exception during program data query", e);
538                }
539            }
540            if (DEBUG) {
541                Log.d(TAG, "Ends programs prefetch for " + programMap.size() + " channels");
542            }
543            return programMap;
544        }
545
546        @Override
547        protected void onPostExecute(Map<Long, ArrayList<Program>> programs) {
548            mProgramsPrefetchTask = null;
549            if (isProgramUpdatePaused()) {
550                // ProgramsPrefetchTask will run again once setPauseProgramUpdate(false) is called.
551                return;
552            }
553            long nextMessageDelayedTime;
554            if (mSuccess) {
555                mChannelIdProgramCache = programs;
556                notifyProgramUpdated();
557                long currentTime = mClock.currentTimeMillis();
558                mLastPrefetchTaskRunMs = currentTime;
559                nextMessageDelayedTime =
560                        Utils.floorTime(
561                                        mLastPrefetchTaskRunMs + PROGRAM_GUIDE_SNAP_TIME_MS,
562                                        PROGRAM_GUIDE_SNAP_TIME_MS)
563                                - currentTime;
564            } else {
565                nextMessageDelayedTime = PERIODIC_PROGRAM_UPDATE_MIN_MS;
566            }
567            if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
568                mHandler.sendEmptyMessageDelayed(
569                        MSG_UPDATE_PREFETCH_PROGRAM, nextMessageDelayedTime);
570            }
571        }
572    }
573
574    private void notifyProgramUpdated() {
575        for (Listener listener : mListeners) {
576            listener.onProgramUpdated();
577        }
578    }
579
580    private class ProgramsUpdateTask extends AsyncDbTask.AsyncQueryTask<List<Program>> {
581        public ProgramsUpdateTask(ContentResolver contentResolver, long time) {
582            super(
583                    mDbExecutor,
584                    contentResolver,
585                    Programs.CONTENT_URI
586                            .buildUpon()
587                            .appendQueryParameter(PARAM_START_TIME, String.valueOf(time))
588                            .appendQueryParameter(PARAM_END_TIME, String.valueOf(time))
589                            .build(),
590                    Program.PROJECTION,
591                    null,
592                    null,
593                    SORT_BY_TIME);
594        }
595
596        @Override
597        public List<Program> onQuery(Cursor c) {
598            final List<Program> programs = new ArrayList<>();
599            if (c != null) {
600                int duplicateCount = 0;
601                Program lastReadProgram = null;
602                while (c.moveToNext()) {
603                    if (isCancelled()) {
604                        return programs;
605                    }
606                    Program program = Program.fromCursor(c);
607                    if (Program.isDuplicate(program, lastReadProgram)) {
608                        duplicateCount++;
609                        continue;
610                    } else {
611                        lastReadProgram = program;
612                    }
613                    programs.add(program);
614                }
615                if (duplicateCount > 0) {
616                    Log.w(TAG, "Found " + duplicateCount + " duplicate programs");
617                }
618            }
619            return programs;
620        }
621
622        @Override
623        protected void onPostExecute(List<Program> programs) {
624            if (DEBUG) Log.d(TAG, "ProgramsUpdateTask done");
625            mProgramsUpdateTask = null;
626            if (programs != null) {
627                Set<Long> removedChannelIds = new HashSet<>(mChannelIdCurrentProgramMap.keySet());
628                for (Program program : programs) {
629                    long channelId = program.getChannelId();
630                    updateCurrentProgram(channelId, program);
631                    removedChannelIds.remove(channelId);
632                }
633                for (Long channelId : removedChannelIds) {
634                    if (mPrefetchEnabled) {
635                        mChannelIdProgramCache.remove(channelId);
636                    }
637                    mChannelIdCurrentProgramMap.remove(channelId);
638                    notifyCurrentProgramUpdate(channelId, null);
639                }
640            }
641            mCurrentProgramsLoadFinished = true;
642        }
643    }
644
645    private class UpdateCurrentProgramForChannelTask extends AsyncDbTask.AsyncQueryTask<Program> {
646        private final long mChannelId;
647
648        private UpdateCurrentProgramForChannelTask(
649                ContentResolver contentResolver, long channelId, long time) {
650            super(
651                    mDbExecutor,
652                    contentResolver,
653                    TvContract.buildProgramsUriForChannel(channelId, time, time),
654                    Program.PROJECTION,
655                    null,
656                    null,
657                    SORT_BY_TIME);
658            mChannelId = channelId;
659        }
660
661        @Override
662        public Program onQuery(Cursor c) {
663            Program program = null;
664            if (c != null && c.moveToNext()) {
665                program = Program.fromCursor(c);
666            }
667            return program;
668        }
669
670        @Override
671        protected void onPostExecute(Program program) {
672            mProgramUpdateTaskMap.remove(mChannelId);
673            updateCurrentProgram(mChannelId, program);
674        }
675    }
676
677    private class MyHandler extends Handler {
678        public MyHandler(Looper looper) {
679            super(looper);
680        }
681
682        @Override
683        public void handleMessage(Message msg) {
684            switch (msg.what) {
685                case MSG_UPDATE_CURRENT_PROGRAMS:
686                    handleUpdateCurrentPrograms();
687                    break;
688                case MSG_UPDATE_ONE_CURRENT_PROGRAM:
689                    {
690                        long channelId = (Long) msg.obj;
691                        UpdateCurrentProgramForChannelTask oldTask =
692                                mProgramUpdateTaskMap.get(channelId);
693                        if (oldTask != null) {
694                            oldTask.cancel(true);
695                        }
696                        UpdateCurrentProgramForChannelTask task =
697                                new UpdateCurrentProgramForChannelTask(
698                                        mContentResolver, channelId, mClock.currentTimeMillis());
699                        mProgramUpdateTaskMap.put(channelId, task);
700                        task.executeOnDbThread();
701                        break;
702                    }
703                case MSG_UPDATE_PREFETCH_PROGRAM:
704                    {
705                        if (isProgramUpdatePaused()) {
706                            return;
707                        }
708                        if (mProgramsPrefetchTask != null) {
709                            mHandler.sendEmptyMessageDelayed(
710                                    msg.what, mProgramPrefetchUpdateWaitMs);
711                            return;
712                        }
713                        long delayMillis =
714                                mLastPrefetchTaskRunMs
715                                        + mProgramPrefetchUpdateWaitMs
716                                        - mClock.currentTimeMillis();
717                        if (delayMillis > 0) {
718                            mHandler.sendEmptyMessageDelayed(
719                                    MSG_UPDATE_PREFETCH_PROGRAM, delayMillis);
720                        } else {
721                            mProgramsPrefetchTask = new ProgramsPrefetchTask();
722                            mProgramsPrefetchTask.executeOnDbThread();
723                        }
724                        break;
725                    }
726                default:
727                    // Do nothing
728            }
729        }
730    }
731
732    /**
733     * Pause program update. Updating program data will result in UI refresh, but UI is fragile to
734     * handle it so we'd better disable it for a while.
735     *
736     * <p>Prefetch should be enabled to call it.
737     */
738    public void setPauseProgramUpdate(boolean pauseProgramUpdate) {
739        SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
740        if (mPauseProgramUpdate && !pauseProgramUpdate) {
741            if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
742                // MSG_UPDATE_PRFETCH_PROGRAM can be empty
743                // if prefetch task is launched while program update is paused.
744                // Update immediately in that case.
745                mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
746            }
747        }
748        mPauseProgramUpdate = pauseProgramUpdate;
749    }
750
751    private boolean isProgramUpdatePaused() {
752        // Although pause is requested, we need to keep updating if cache is empty.
753        return mPauseProgramUpdate && !mChannelIdProgramCache.isEmpty();
754    }
755
756    /**
757     * Sets program data prefetch time range. Any program data that ends before the start time will
758     * be removed from the cache later. Note that there's no limit for end time.
759     *
760     * <p>Prefetch should be enabled to call it.
761     */
762    public void setPrefetchTimeRange(long startTimeMs) {
763        SoftPreconditions.checkState(mPrefetchEnabled, TAG, "Prefetch is disabled.");
764        if (mPrefetchTimeRangeStartMs > startTimeMs) {
765            // Fetch the programs immediately to re-create the cache.
766            if (!mHandler.hasMessages(MSG_UPDATE_PREFETCH_PROGRAM)) {
767                mHandler.sendEmptyMessage(MSG_UPDATE_PREFETCH_PROGRAM);
768            }
769        }
770        mPrefetchTimeRangeStartMs = startTimeMs;
771    }
772
773    private void clearTask(LongSparseArray<UpdateCurrentProgramForChannelTask> tasks) {
774        for (int i = 0; i < tasks.size(); i++) {
775            tasks.valueAt(i).cancel(true);
776        }
777        tasks.clear();
778    }
779
780    private void cancelPrefetchTask() {
781        if (mProgramsPrefetchTask != null) {
782            mProgramsPrefetchTask.cancel(true);
783            mProgramsPrefetchTask = null;
784        }
785    }
786
787    // Create dummy program which indicates data isn't loaded yet so DB query is required.
788    private Program createDummyProgram(long startTimeMs, long endTimeMs) {
789        return new Program.Builder()
790                .setChannelId(Channel.INVALID_ID)
791                .setStartTimeUtcMillis(startTimeMs)
792                .setEndTimeUtcMillis(endTimeMs)
793                .build();
794    }
795
796    @Override
797    public void performTrimMemory(int level) {
798        mChannelId2ProgramUpdatedListeners.clearEmptyCache();
799    }
800}
801