55import org .slf4j .Logger ;
66import org .slf4j .LoggerFactory ;
77
8+ import java .lang .ref .WeakReference ;
89import java .util .Optional ;
910import java .util .concurrent .*;
11+ import java .util .concurrent .atomic .AtomicReference ;
1012import java .util .function .Supplier ;
1113
1214interface RefreshJob <T > {
@@ -19,46 +21,79 @@ interface RefreshJob<T> {
1921 final class Poll <T > implements RefreshJob <T > {
2022 private final RefreshStrategy .Polling config ;
2123 private final Supplier <CompletableFuture <T >> action ;
22- private final CompletableFuture <T > output ;
24+ private final Runnable onChange ;
25+ private final CompletableFuture <T > firstOutput ;
26+ private volatile T latestOutput = null ;
2327 private ScheduledFuture <?> poll ;
2428
25- Poll (RefreshStrategy .Polling config , Supplier <CompletableFuture <T >> action ) {
29+ Poll (RefreshStrategy .Polling config , Supplier <CompletableFuture <T >> action , Runnable onChange ) {
2630 this .config = config ;
2731 this .action = action ;
28- this .output = new CompletableFuture <>();
32+ this .onChange = onChange ;
33+ this .firstOutput = new CompletableFuture <>();
2934 }
3035
3136 void start () {
3237 log .debug ("Starting polling-refresh." );
33- poll = SEXEC .schedule (
38+ WeakReference <Poll <T >> weakSelf = new WeakReference <>(this );
39+ AtomicReference <ScheduledFuture <?>> taskRef = new AtomicReference <>();
40+
41+ ScheduledFuture <?> scheduled = SEXEC .scheduleAtFixedRate (
3442 () -> {
35- var o = RefreshJob .runRefreshWithTimeout (action , config .timeout );
43+ Poll <T > self = weakSelf .get ();
44+ if (self == null ) {
45+ log .debug ("Poll referent GC'd — self-cancelling polling task." );
46+ ScheduledFuture <?> t = taskRef .get ();
47+ if (t != null ) t .cancel (false );
48+ return ;
49+ }
50+ var o = RefreshJob .runRefreshWithTimeout (self .action , self .config .timeout );
3651 if (o != null ) {
37- output .complete (o );
52+ boolean changed = !o .equals (self .latestOutput );
53+ self .latestOutput = o ;
54+ if (!self .firstOutput .isDone ()) {
55+ self .firstOutput .complete (o );
56+ }
57+ if (changed && self .onChange != null ) {
58+ try {
59+ self .onChange .run ();
60+ } catch (Exception e ) {
61+ log .error ("onChange callback error: {}" , e .getMessage ());
62+ }
63+ } else if (!changed ) {
64+ log .debug ("Output unchanged, skipping onChange callback." );
65+ }
3866 }
3967 },
68+ 0 ,
4069 config .interval ,
4170 TimeUnit .MILLISECONDS
4271 );
72+
73+ taskRef .set (scheduled );
74+ this .poll = scheduled ;
4375 }
4476
4577 @ Override
4678 public Optional <T > getOutput () {
79+ if (latestOutput != null ) {
80+ return Optional .of (latestOutput );
81+ }
4782 try {
4883 if (poll == null ) {
4984 log .warn ("Polling hasn't started but the output is being used." );
50- } else if (!poll . isCancelled () && ! output .isDone ()) {
51- return Optional .ofNullable (output .get (config .timeout , TimeUnit .MILLISECONDS ));
85+ } else if (!firstOutput .isDone ()) {
86+ return Optional .ofNullable (firstOutput .get (config .timeout , TimeUnit .MILLISECONDS ));
5287 }
5388 } catch (Exception e ) {
5489 log .warn ("Attempted to await for poll output but an exception occurred: {}" , e .toString ());
5590 }
56- return Optional .ofNullable (output . getNow ( null ) );
91+ return Optional .ofNullable (latestOutput );
5792 }
5893
5994 @ Override
6095 public void shutdown () {
61- if (!poll .isCancelled ()) {
96+ if (poll != null && !poll .isCancelled ()) {
6297 log .debug ("Shutting down polling-refresh." );
6398 poll .cancel (false );
6499 }
@@ -71,11 +106,13 @@ final class OnDemand<T> implements RefreshJob<T> {
71106 private T output = null ;
72107 private final RefreshStrategy .OnDemand config ;
73108 private final Supplier <CompletableFuture <T >> action ;
109+ private final Runnable onChange ;
74110 private boolean stopped = false ;
75111
76- OnDemand (RefreshStrategy .OnDemand config , Supplier <CompletableFuture <T >> action ) {
112+ OnDemand (RefreshStrategy .OnDemand config , Supplier <CompletableFuture <T >> action , Runnable onChange ) {
77113 this .config = config ;
78114 this .action = action ;
115+ this .onChange = onChange ;
79116 }
80117
81118 @ Override
@@ -85,8 +122,18 @@ public Optional<T> getOutput() {
85122 log .debug ("Running refresh as current output is stale." );
86123 var o = RefreshJob .runRefreshWithTimeout (action , config .timeout );
87124 if (o != null ) {
125+ boolean changed = !o .equals (output );
88126 output = o ;
89127 lastUpdated = System .currentTimeMillis ();
128+ if (changed && onChange != null ) {
129+ try {
130+ onChange .run ();
131+ } catch (Exception e ) {
132+ log .error ("onChange callback error: {}" , e .getMessage ());
133+ }
134+ } else if (!changed ) {
135+ log .debug ("Output unchanged, skipping onChange callback." );
136+ }
90137 }
91138 } else {
92139 log .debug ("Current output is fresh, no refresh required." );
@@ -114,10 +161,14 @@ private static<T> T runRefreshWithTimeout(Supplier<CompletableFuture<T>> action,
114161 }
115162
116163 static <T > RefreshJob <T > create (RefreshStrategy config , Supplier <CompletableFuture <T >> action ) {
164+ return create (config , action , null );
165+ }
166+
167+ static <T > RefreshJob <T > create (RefreshStrategy config , Supplier <CompletableFuture <T >> action , Runnable onChange ) {
117168 if (config instanceof RefreshStrategy .Polling ) {
118- return new Poll <>((RefreshStrategy .Polling )config , action );
169+ return new Poll <>((RefreshStrategy .Polling )config , action , onChange );
119170 } else if (config instanceof RefreshStrategy .OnDemand ) {
120- return new OnDemand <>((RefreshStrategy .OnDemand )config , action );
171+ return new OnDemand <>((RefreshStrategy .OnDemand )config , action , onChange );
121172 }
122173 throw new IllegalArgumentException ("Invalid refresh-strategy: " + config );
123174 }
0 commit comments