|
- # Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
- #
- # This module is part of GitDB and is released under
- # the New BSD License: http://www.opensource.org/licenses/bsd-license.php
- """Test for object db"""
-
- from gitdb.test.lib import (
- TestBase,
- DummyStream,
- make_bytes,
- make_object,
- fixture_path
- )
-
- from gitdb import (
- DecompressMemMapReader,
- FDCompressedSha1Writer,
- LooseObjectDB,
- Sha1Writer,
- MemoryDB,
- IStream,
- )
- from gitdb.util import hex_to_bin
-
- import zlib
- from gitdb.typ import (
- str_blob_type
- )
-
- import tempfile
- import os
- from io import BytesIO
-
-
- class TestStream(TestBase):
-
- """Test stream classes"""
-
- data_sizes = (15, 10000, 1000 * 1024 + 512)
-
- def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
- """Make stream tests - the orig_stream is seekable, allowing it to be
- rewound and reused
- :param cdata: the data we expect to read from stream, the contents
- :param rewind_stream: function called to rewind the stream to make it ready
- for reuse"""
- ns = 10
- assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
-
- # read in small steps
- ss = len(cdata) // ns
- for i in range(ns):
- data = stream.read(ss)
- chunk = cdata[i * ss:(i + 1) * ss]
- assert data == chunk
- # END for each step
- rest = stream.read()
- if rest:
- assert rest == cdata[-len(rest):]
- # END handle rest
-
- if isinstance(stream, DecompressMemMapReader):
- assert len(stream.data()) == stream.compressed_bytes_read()
- # END handle special type
-
- rewind_stream(stream)
-
- # read everything
- rdata = stream.read()
- assert rdata == cdata
-
- if isinstance(stream, DecompressMemMapReader):
- assert len(stream.data()) == stream.compressed_bytes_read()
- # END handle special type
-
- def test_decompress_reader(self):
- for close_on_deletion in range(2):
- for with_size in range(2):
- for ds in self.data_sizes:
- cdata = make_bytes(ds, randomize=False)
-
- # zdata = zipped actual data
- # cdata = original content data
-
- # create reader
- if with_size:
- # need object data
- zdata = zlib.compress(make_object(str_blob_type, cdata))
- typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
- assert size == len(cdata)
- assert typ == str_blob_type
-
- # even if we don't set the size, it will be set automatically on first read
- test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
- assert test_reader._s == len(cdata)
- else:
- # here we need content data
- zdata = zlib.compress(cdata)
- reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
- assert reader._s == len(cdata)
- # END get reader
-
- self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
-
- # put in a dummy stream for closing
- dummy = DummyStream()
- reader._m = dummy
-
- assert not dummy.closed
- del(reader)
- assert dummy.closed == close_on_deletion
- # END for each datasize
- # END whether size should be used
- # END whether stream should be closed when deleted
-
- def test_sha_writer(self):
- writer = Sha1Writer()
- assert 2 == writer.write("hi".encode("ascii"))
- assert len(writer.sha(as_hex=1)) == 40
- assert len(writer.sha(as_hex=0)) == 20
-
- # make sure it does something ;)
- prev_sha = writer.sha()
- writer.write("hi again".encode("ascii"))
- assert writer.sha() != prev_sha
-
- def test_compressed_writer(self):
- for ds in self.data_sizes:
- fd, path = tempfile.mkstemp()
- ostream = FDCompressedSha1Writer(fd)
- data = make_bytes(ds, randomize=False)
-
- # for now, just a single write, code doesn't care about chunking
- assert len(data) == ostream.write(data)
- ostream.close()
-
- # its closed already
- self.failUnlessRaises(OSError, os.close, fd)
-
- # read everything back, compare to data we zip
- fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
- written_data = os.read(fd, os.path.getsize(path))
- assert len(written_data) == os.path.getsize(path)
- os.close(fd)
- assert written_data == zlib.compress(data, 1) # best speed
-
- os.remove(path)
- # END for each os
-
- def test_decompress_reader_special_case(self):
- odb = LooseObjectDB(fixture_path('objects'))
- mdb = MemoryDB()
- for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
- b'7bb839852ed5e3a069966281bb08d50012fb309b',):
- ostream = odb.stream(hex_to_bin(sha))
-
- # if there is a bug, we will be missing one byte exactly !
- data = ostream.read()
- assert len(data) == ostream.size
-
- # Putting it back in should yield nothing new - after all, we have
- dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
- assert dump.hexsha == sha
- # end for each loose object sha to test
|