Skip to content Skip to sidebar Skip to footer

How Refactor Readchunk From Sftpfile To Stop Using Inlinecallbacks?

I'm trying to read from file over ISFTPFile and I want to avoid using @inlinceCallbacks in this scenario? Or maybe there is a better way to read/write for ISFTPFile? @defer.inlin

Solution 1:

You effectively want to map sha256.update over an iterator of file chunks:

hasher = hashlib.sha256()
chunks = read_those_chunks()
map(hasher.update, chunks)
return hasher.hexdigest()

Note that the explicit iteration from the original calculate_checksums (using the while loop) is now hidden inside of map. Basically, map has replaced the iteration.

The obstacle is that you want to avoid a read_those_chunks which loads the whole file into memory (presumably). So, as a first step, implement that piece:

defread_those_chunks(open_file, chunk_size):
    offset = 0whileTrue:
        yield open_file.readChunk(offset, chunk_size)
        offset += chunk_size

There's a generator that yields Deferreds that fire with subsequent chunks (or EOFError). Unfortunately, you can't use this with map. So now implement a map-alike that can deal with this:

defasync_map(function, iterable):
    try:
        d = next(iterable)
    except StopIteration:
        return

    d.addCallback(function)
    d.addCallback(lambda ignored: async_map(function, iterable))
    return d

Since async_map is going to replace map and map replaced the iteration from the original implementation, async_map is still responsible for making sure we visit every chunk from the iterable. However, iteration (with either for or while) doesn't mix well with Deferred (mixing them is when you typically pull out inlineCallbacks). So async_map doesn't iterate. It recurses - a common alternative to iteration. Each recursive call operates on the next element of the iterable until there are no more (or until a Deferred fails, as will happen in this case due to EOFError).

Recursion works better than iteration with Deferred because recursion operates on functions and function calls. Deferred can deal with functions and function calls - pass a function to addCallback and Deferred will eventually call that function. Iteration is made up of small pieces of a function (sometimes called "blocks" or "suites") and Deferred can't deal with these. You can't pass a block to addCallback.

Now use these two to create a Deferred that fires when the digest has been computed:

def calculate_checksum(open_file, chunk_size):
    hasher = hashlib.sha256()
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_map(hasher.update, chunks)
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda ignored: hasher.hexdigest())
    return d

You may also notice that async_map differs from map in that it doesn't produce a list of results of the function calls it makes. Perhaps it's more like reduce:

defasync_reduce(function, iterable, lhs):
    try:
        d = next(iterable)
    except StopIteration:
        return lhs

    d.addCallback(lambda rhs: function(lhs, rhs))
    d.addCallback(lambda lhs: async_reduce(function, iterable, lhs))
    return d

It's still recursive instead of iterative, of course.

And a reducing function for computing the hexdigest is like:

defupdate_hash(hasher, s):
    hasher.update(s)
    return hasher

And so calculate_checksum becomes:

def calculate_checksum(open_file, chunk_size):
    chunks = read_those_chunks(open_file, chunk_size)
    d = async_reduce(update_hash, hashlib.sha256(), "")
    d.addErrback(lambda err: err.trap(EOFError))
    d.addCallback(lambda hasher: hasher.hexdigest())
    return d

which is a bit nicer for not having the hasher closure.

Of course, there are also many other ways you could rewrite this function to avoid inlineCallbacks. The way I've chosen doesn't eliminate the use of a generator function so if that's what you wanted to escape it hasn't really helped. If so, perhaps you can decompose the problem as I have done here into different pieces, none of which involve a generator.

Post a Comment for "How Refactor Readchunk From Sftpfile To Stop Using Inlinecallbacks?"