package reactivestreams; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * Processor consuming a flow of digit Strings and emitting a single Boolean which is true if applying 9 digits are unique. */ public abstract class DigitBlockChecker extends SubmissionPublisher implements Flow.Processor { int currentRow; int currentColumn; Flow.Subscription subscription; Collection digitsProcessed; @Override public void onSubscribe(Flow.Subscription subscription) { currentRow = 1; currentColumn = 0; this.subscription = subscription; digitsProcessed = new HashSet<>(); subscription.request(9*9); } @Override public void onNext(String digit) { currentColumn++; if (currentColumn > 9) { currentRow++; currentColumn = 1; } if (currentDigitApplies(currentRow, currentColumn)) { if (digitsProcessed.contains(digit)) { submit(false); subscription.cancel(); } else { digitsProcessed.add(digit); if (digitsProcessed.size() == 9) { submit(true); subscription.cancel(); } } } } protected abstract boolean currentDigitApplies(int currentRow, int currentColumn); @Override public void onError(Throwable throwable) { System.err.println(throwable.fillInStackTrace()); closeExceptionally(throwable); } @Override public void onComplete() { // not called } }