Archiver reader lock refactor#3807
Conversation
|
Not sure whether to split this up into multiple MRs:
|
The archive fetch job is already a background thread, and |
|
Is the intent of the added level of threading only for the timeout? Before: With this PR: |
With synchronized hasNext(), a worker blocked inside mainIterator.hasNext() holds the 'this' monitor indefinitely; close() cannot acquire it and hangs. closeCompletesWhileHasNextIsBlocking reproduces this with @timeout(5). Includes FakeDataRetrieval/FakeApplianceArchiveReader/BlockingGenMsgIterator test infrastructure and Mockito dependency. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
hasNext() no longer holds the 'this' monitor while blocking on a streaming network read. close() can now acquire the lock concurrently, set closed=true, and close the underlying stream — which in turn unblocks the reader. next() still guards its mainIterator.next() call with synchronized(this) and rechecks closed, so the close-between-hasNext-and-next race remains safe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tches twoIteratorFetchesProceedConcurrently shows that two iterators for independent PVs block each other inside fetchDataInternal() because they compete for a single static monitor. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Changing the lock from static to a per-instance final field means each iterator only serialises its own concurrent fetchDataInternal calls. Fetches for different PVs can now proceed concurrently. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ApplianceArchiveReaderTest: verifies enum PVs route to non-numeric iterator, numeric scalars use the optimized path, cancel() closes active iterators, and the WeakHashMap releases references after GC. ApplianceMeanValueIteratorTest: checks the mean_<interval>() operator URL, and that determineDisplay rejects enum/waveform types. ApplianceOptimizedValueIteratorTest: verifies the optimized_N() URL, VStatistics output when useStatistics=true, and VNumber when false. ApplianceNonNumericOptimizedValueIteratorTest: checks n=1 routes raw fetch, n>1 uses nth_N() operator, and the 1.5x boundary gives n=2. ApplianceStatisticsValueIteratorTest: verifies all five operator streams (mean, std, min, max, count) are opened, and that close() closes all five. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
faultySourceTimesOutAndLoopContinues fails: the slow source blocks WorkerThread.run() indefinitely because no timeout guards the fetch loop; @timeout(5) fires after 5 s, proving the bug. Scaffolding added so the tests compile: - archive_read_timeout_ms preference key and 30 s default - ArchiveFetchJob.openReader() hook so TestableFetchJob can inject fake readers without going through ArchiveReaders SPI - Package-private test-only constructor that skips JobManager - Mockito added to databrowser pom Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
WorkerThread.run() now submits each source fetch to Activator.thread_pool and calls Future.get(archive_read_timeout_ms, MILLISECONDS). On timeout: - Future.cancel(true) interrupts the carrier thread - The active ArchiveReader.cancel() is called to close the connection - archiveFetchFailed() is reported and the loop advances to the next source fetchFromSource() extracts the reader-open + iterator-drain logic that previously lived inline in run(), keeping the outer loop readable. archive_read_timeout_ms=0 disables the timeout (Future.get() with no deadline), preserving the previous behaviour when needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Broadens test coverage beyond the three regression scenarios (static lock, hasNext/close deadlock, per-source timeout) to cover the normal code paths that a reader of these classes relies on. - ApplianceValueIteratorExtractDataTest (new, 13 tests): extractData() across all supported payload types — scalar numerics, enum, string, waveform — plus alarm severity and display header extraction. - ApplianceArchiveReaderTest (+6 tests): getOptimizedValues() routing (points≤count→raw, statistics, mean, old-appliance fallback on fetch failure), iterator map registration, and close()→cancel() delegation. - ApplianceStatisticsValueIteratorTest (+2 tests): VStatistics assembly from five sub-iterators, and null-safe next() after close(). - ArchiveFetchJobTest (+7 tests): UnknownChannelException→channelNotFound routing (full and partial), two healthy sources completing together, IOException→archiveFetchFailed, cancel-before-start, and RAW vs OPTIMIZED request type dispatch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- ApplianceValueIterator.close(): log IOException at WARNING instead of wrapping it in IllegalStateException; move closed=true into finally so it is always set even when the stream close throws - ApplianceStatisticsValueIterator.closeStream(): add package-level logger and emit WARNING instead of silently swallowing the IOException - ApplianceArchiveReader.getOptimizedValues(): log FINE for expected type fallbacks and WARNING when the optimized or count-based path fails - ArchiveFetchJob.WorkerThread: log WARNING for ExecutionException and generic Exception fetch failures, not just TimeoutException Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
0638701 to
dac09a8
Compare
Timeout is now checked in the existing run(JobMonitor) poll loop rather than a per-source virtual-thread watchdog, eliminating the extra thread per source. A stuck source triggers cancel() on the whole job. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
dac09a8 to
4d19a4f
Compare
|
Refactored so the timeout handling is in ArchiveFetchJob.run rather than in ArchiveFetchJob.WorkerThread.run. Also now uses the existing cancel rather than some explicit other way of stopping the job. |
|
@georgweiss Note I didn't add a notification for the timeout in this PR. I'm not sure how to do it, but I thought would be better as a follow up. |
|
kasemir
left a comment
There was a problem hiding this comment.
For the ArchiveFetchJob, this PR basically changes
if (monitor.isCanceled())
worker.cancel();
into
if (monitor.isCanceled() || we_have_exceeded_archive_read_timeout_ms))
worker.cancel();
The ArchiveReader is otherwise created and used the same way,
so this should be perfectly safe for the different archive reader
implementations.
Yes, that is good. |



Refactors for ArchiveFetchJob, ApplianceValueIterator, ApplianceArchiveReader.
Refactors:
Fixes:
Feature:
Testing:
Checklist
Testing:
Documentation: