[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

khmer multiprocessing + seqan #655

Open
wants to merge 117 commits into
base: master
Choose a base branch
from
Open

Conversation

camillescott
Copy link
Member

Multiprocessing for khmer.

Addresses #76 and #92; greatly extends khmer's multiprocessing capabilities.

See #638 for original PR.

See #656 for a pull against the seqan branch which masks those changes and improves readability.

See khmer-metrics for some performance profiling.


From original pull:

Example usage from Python can be found here: https://github.com/camillescott/khmer-metrics/blob/master/test_async_diginorm.py

@ctb @mr-c @luizirber thoughts on the Python interface are welcome. As of now, interaction with processed reads is mediated by an iterator over the output queue, which returns khmer::read_parsers::Read objects.


Design

The current design builds off the following assumptions:

  • Writing to the hashtable is quite fast from one thread.
  • Making hashtable writing threadsafe is not practical with bigcount (testing shows that the locking required negates any benefit), and will remain so until bigcount is replaced by larger bin sizes (or we decide to eschew bigcount when using multiprocessing)
  • The main bottleneck for most tasks is read processing, for example in diginorm, finding median counts, etc., which is generally threadsafe
  • Most of our use cases follow a similar structure of pull in reads, do stuff to them using the hashtable, spit them out (or not) to disk

With these constraints in mind, I have begun by focusing on streaming tasks and taking advantage of asynchronous IO and hashtable access. The basic building block is thus the Asyncabstract base class, which:

  • Takes a khmer::Hashtable instance on construction
  • Defines a lock-free input queue which is templated, usually understood to take HashIntoType, Read, or const char *.
  • Declares (but leaves undefined) a consume method, which is expected to be threadsafe and pull from the input queue
  • Defines a start(int n_threads) method which launches the specified number of threads running consume
  • Defines a stop() method which stops the running consume threads
  • Defines a number of bookkeeping getters, setters, and boolean statuses for managing thread state

All the actual async implementations build off this class. For example, the AsyncSequenceWriter inherits from Async<const char *>, and its consume method breaks down the input sequences into k-mers and writes them to the given hashtable.

The AsyncSequenceProcessor is another abstract class which builds off Async<Read>, adding an output queue and an additional reader thread; the reader thread parses reads from a file given to start, which are asynchronously pushed to the input queue. The consume threads still pull off this queue, and are expected to push their results to the output queue. It also declares a stop_iter method, which returns false when the conditions indicate that all parsing and processing is complete and is used for the python interface.

AsyncDiginorm (and any other future read processors, say, abundfilt) inherits from AsyncSequenceProcessor. Its consume method implements digital normalization with a cutoff value given to the start method.


Python Interface

As expected, the various processors are exposed as Python objects. For now, only AsyncDiginorm is fully wrapped, though AsyncSequenceProcessor is partially wrapped. Their new methods pull the pointer to a Hashtable object from the object's python wrapper and pass it to the constructor. The progress getters, start, and stop methods are exposed. A user creates a counting hash, then an AsyncDiginorm object, and passes that table in. Then, they call start with the desired cutoff, filename, and number of threads, which launches the parser thread and consume threads, which run asynchronously. The final piece is output, which is the reason for AsyncSequenceProcessor to be exposed; it defines an iterator over the output queue, which calls iter_stop to determine status. Maintaining the class hierarchy in Python-land not adds structure, but also avoids needing to redefine this iterator for every processor class.


Boost

This implementation uses boost::lockfree::queue.hpp. This is a non-blocking, lock-free, multi-producer multi-consumer queue. Queues are a possible huge bottleneck, and these lockfree queues are considerably faster in this case than a trivial locked queue. Their implementation means that they have a max size of 65535; this doesn't really matter, as I have limited the max queue length to 50000 as is. This is to avoid the read parser getting ahead of the processor threads and filling up main memory. The parser thread simply spins until it can push to the queue again.

There is some debate to be had as to whether boost is a good solution, but for now I'd rather spend time working on khmer's internals and not reinventing the data structure wheel. At the request of @mr-c, I have package a subset of boost in third-party. Conveniently, the boost devs provide a tool called bcp for just that. The command I ran to extract the relevant files is:

bcp boost/lockfree/queue.hpp --namespace=pkgboost --boost=/usr/include/ /w/lockfree

The --namespace option renames all the boost namespaces. I have done this to make sure users are linking to our version of boost and not a local version they have installed.

This once again adds a pile of new files, but I think avoiding the hassle of implementing the queue ourselves is worthwhile. Note that going this direction also opens up the option of using boost to tackle the streaming problem, but that's for @mr-c, @ctb, and @bocajnotnef to figure out :)

NOTE: I rolled back this change for now because it once again made it impossible to review. However, this is the process we can/should use for final merge.


Further Considerations

An important consideration is that the asynchronous nature of this method means results are not replicable. In particular, digital normalization on smaller file sizes can run into considerable variability (+/- 20000 reads kept on a 1m read input), because of the async hash writer thread. However, as the hashtable becomes more saturated, the writer thread "catches up" to the processor threads, and the results (should) converge toward what one would expect from a normal, serial run. Curiously, this also means that the program runs faster the longer it runs, approaching the IO speed (given number of threads, disk speed, etc), because processor threads are no longer waiting for the writer thread to write out the reads in its queue.


Project TODO

  • General framework for asychronous processing
  • Implement async hashing
  • Implement async diginorm
  • Explore existing threading performance
  • Expose implementations to Python land
  • Basic tests for async diginorm
  • Pair-awareness
  • Run with acceptance tests
  • Run with very large samples
  • Add further performance profiling to khmer-metrics
  • Test AsyncSequenceProcessor more generally
  • Implement a script for async normalize-by-median
  • Additional / better correctness checks for AsyncDiginorm
  • Additional / better exception handling
  • Add inline documentation
  • Implement AsyncFilterAbund
  • Explore threadsafe writing to the hashtable
  • Discuss the use of boost::lockfree with @ctb
  • Stick relevant parts of boost::lockfree into third-party

Merge Checklist

  • Is it mergable?
  • Did it pass the tests?
  • If it introduces new functionality in scripts/ is it tested?
    Check for code coverage.
  • Is it well formatted? Look at make pep8, make diff_pylint_report,
    make cppcheck, and make doc output. Use make format and manual fixing as needed.
  • Is it documented in the ChangeLog?
  • Was a spellchecker run on the source code and documentation after
    changes were made?

@ctb ctb mentioned this pull request Dec 14, 2014
@ctb ctb changed the title khmer multprocessing + seqan khmer multiprocessing + seqan Dec 14, 2014
@camillescott
Copy link
Member Author

Re: my previous comment about that serious memory leak, it has been resolved in #692.

@mr-c
Copy link
Contributor
mr-c commented Jan 26, 2015
  • @camillescott to make another branch with the Boost bits that Jenkins can build
  • @mr-c to test that branch on the BaTLab systems
  • @camillescott to peel off chunks of that new branch into digestible pull requests for merging

camillescott and others added 8 commits January 26, 2015 16:57
Uses boost's included bcp utility to extract relevant files. Command used was:

`bcp boost/lockfree/queue.hpp --namespace=pkgboost --boost=/usr/include/ third-party/boost`

This places the packaged boost into its own namespace, `pkgboost`, to avoid collisions with existing boost installations.
… issue, add explicit check for is_threadsafe, add more explicit state management to async_sequence_processor
@ctb
Copy link
Member
ctb commented Jun 12, 2015

ping @camillescott

@ctb ctb modified the milestones: 2.0, 1.4+ Jun 12, 2015
@ctb ctb removed this from the 2.0 milestone Jul 22, 2015
@mr-c mr-c added this to the unscheduled milestone Sep 4, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants