This commit is contained in:
mdecker62
2026-03-03 14:22:48 -05:00
parent 2db7bb51f5
commit d9ba2ef19c
1047 changed files with 205313 additions and 0 deletions

View File

@@ -0,0 +1,65 @@
# Copyright (c) 2008 - 2025, Ilan Schnell; All Rights Reserved
"""
This package defines an object type which can efficiently represent
a bitarray. Bitarrays are sequence types and behave very much like lists.
Please find a description of this package at:
https://github.com/ilanschnell/bitarray
Author: Ilan Schnell
"""
from collections import namedtuple
from bitarray._bitarray import (
bitarray, decodetree, bits2bytes, _bitarray_reconstructor,
get_default_endian, _sysinfo, BITARRAY_VERSION as __version__
)
__all__ = ['bitarray', 'frozenbitarray', 'decodetree', 'bits2bytes']
BufferInfo = namedtuple('BufferInfo',
['address', 'nbytes', 'endian', 'padbits',
'alloc', 'readonly', 'imported', 'exports'])
class frozenbitarray(bitarray):
"""frozenbitarray(initializer=0, /, endian='big', buffer=None) -> \
frozenbitarray
Return a `frozenbitarray` object. Initialized the same way a `bitarray`
object is initialized. A `frozenbitarray` is immutable and hashable,
and may therefore be used as a dictionary key.
"""
def __init__(self, *args, **kwargs):
self._freeze()
def __repr__(self):
return 'frozen' + bitarray.__repr__(self)
def __hash__(self):
"Return hash(self)."
# ensure hash is independent of endianness
a = bitarray(self, 'big')
return hash((len(a), a.tobytes()))
# Technically the code below is not necessary, as all these methods will
# raise a TypeError on read-only memory. However, with a different error
# message.
def __delitem__(self, *args, **kwargs):
"" # no docstring
raise TypeError("frozenbitarray is immutable")
append = bytereverse = clear = extend = encode = fill = __delitem__
frombytes = fromfile = insert = invert = pack = pop = __delitem__
remove = reverse = setall = sort = __setitem__ = __delitem__
__iadd__ = __iand__ = __imul__ = __ior__ = __ixor__ = __delitem__
__ilshift__ = __irshift__ = __delitem__
def test(verbosity=1):
"""test(verbosity=1) -> TextTestResult
Run self-test, and return `unittest.runner.TextTestResult` object.
"""
from bitarray import test_bitarray
return test_bitarray.run(verbosity=verbosity)

View File

@@ -0,0 +1,170 @@
# Copyright (c) 2021 - 2025, Ilan Schnell; All Rights Reserved
#
# This stub, as well as util.pyi, are tested with Python 3.10 and mypy 1.11.2
from collections.abc import Iterable, Iterator, Sequence
from unittest.runner import TextTestResult
from typing import Any, BinaryIO, Dict, Union, overload, NamedTuple
CodeDict = Dict[Any, bitarray]
# Python 3.12 has abc.Buffer which should be used instead
BytesLike = Union[bytes, bytearray]
class BufferInfo(NamedTuple):
address: int
nbytes: int
endian: str
padbits: int
alloc: int
readonly: bool
imported: bool
exports: int
class decodetree:
def __init__(self, code: CodeDict) -> None: ...
def complete(self) -> bool: ...
def nodes(self) -> int: ...
def todict(self) -> CodeDict: ...
class bitarray:
def __init__(self,
initializer: Union[int, str, Iterable[int], None] = ...,
endian: Union[str, None] = ...,
buffer: Any = ...) -> None: ...
def all(self) -> bool: ...
def any(self) -> bool: ...
def append(self, value: int) -> None: ...
def buffer_info(self) -> BufferInfo: ...
def bytereverse(self,
start: int = ...,
stop: int = ...) -> None: ...
def clear(self) -> None: ...
def copy(self) -> bitarray: ...
def count(self,
sub_bitarray: Union[bitarray, int] = ...,
start: int = ...,
stop: int = ...,
step: int = ...) -> int: ...
def encode(self, code: CodeDict, x: Iterable) -> None: ...
def decode(self,
code: Union[CodeDict, decodetree]) -> Iterator: ...
def extend(self, x: Union[str, Iterable[int]]) -> None: ...
def fill(self) -> int: ...
def find(self,
sub_bitarray: Union[bitarray, int],
start: int = ...,
stop: int = ...,
right: int = ...) -> int: ...
def frombytes(self, a: BytesLike) -> None: ...
def fromfile(self, f: BinaryIO, n: int = ...) -> None: ...
def index(self,
sub_bitarray: Union[bitarray, int],
start: int = ...,
stop: int = ...,
right: int = ...) -> int: ...
def insert(self, i: int, value: int) -> None: ...
def invert(self, i: int = ...) -> None: ...
def search(self,
sub_bitarray: Union[bitarray, int],
start: int = ...,
stop: int = ...,
right: int = ...) -> Iterator[int]: ...
def pack(self, b: BytesLike) -> None: ...
def pop(self, i: int = ...) -> int: ...
def remove(self, value: int) -> None: ...
def reverse(self) -> None: ...
def setall(self, value: int) -> None: ...
def sort(self, reverse: int) -> None: ...
def to01(self,
group: int = ...,
sep: str = ...) -> str: ...
def tobytes(self) -> bytes: ...
def tofile(self, f: BinaryIO) -> None: ...
def tolist(self) -> list[int]: ...
def unpack(self,
zero: bytes = ...,
one: bytes = ...) -> bytes: ...
def __len__(self) -> int: ...
def __iter__(self) -> Iterator[int]: ...
@overload
def __getitem__(self, i: int) -> int: ...
@overload
def __getitem__(self,
s: Union[slice, bitarray, Sequence]) -> bitarray: ...
@overload
def __setitem__(self,
i: Union[int, slice, Sequence],
o: int) -> None: ...
@overload
def __setitem__(self,
s: Union[slice, bitarray, Sequence],
o: bitarray) -> None: ...
def __delitem__(self,
i: Union[int, slice, bitarray, Sequence]) -> None: ...
def __buffer__(self, flags: int, /) -> memoryview: ...
def __release_buffer__(self, buffer: memoryview, /) -> None: ...
def __add__(self, other: bitarray) -> bitarray: ...
def __iadd__(self, other: bitarray) -> bitarray: ...
def __mul__(self, n: int) -> bitarray: ...
def __imul__(self, n: int) -> bitarray: ...
def __rmul__(self, n: int) -> bitarray: ...
def __ge__(self, other: bitarray) -> bool: ...
def __gt__(self, other: bitarray) -> bool: ...
def __le__(self, other: bitarray) -> bool: ...
def __lt__(self, other: bitarray) -> bool: ...
def __and__(self, other: bitarray) -> bitarray: ...
def __or__(self, other: bitarray) -> bitarray: ...
def __xor__(self, other: bitarray) -> bitarray: ...
def __iand__(self, other: bitarray) -> bitarray: ...
def __ior__(self, other: bitarray) -> bitarray: ...
def __ixor__(self, other: bitarray) -> bitarray: ...
def __invert__(self) -> bitarray: ...
def __lshift__(self, n: int) -> bitarray: ...
def __rshift__(self, n: int) -> bitarray: ...
def __ilshift__(self, n: int) -> bitarray: ...
def __irshift__(self, n: int) -> bitarray: ...
# data descriptors
@property
def endian(self) -> str: ...
@property
def nbytes(self) -> int: ...
@property
def padbits(self) -> int: ...
@property
def readonly(self) -> bool: ...
class frozenbitarray(bitarray):
def __hash__(self) -> int: ...
__version__: str
def bits2bytes(n: int) -> int: ...
def get_default_endian() -> str: ...
def test(verbosity: int = ...) -> TextTestResult: ...
def _sysinfo(key: str) -> int: ...
def _bitarray_reconstructor(cls: type,
buffer: bytes,
endian: str,
padbits: int,
readonly: int) -> bitarray: ...

View File

@@ -0,0 +1,354 @@
/*
Copyright (c) 2008 - 2025, Ilan Schnell; All Rights Reserved
bitarray is published under the PSF license.
Author: Ilan Schnell
*/
#define BITARRAY_VERSION "3.8.0"
#ifdef STDC_HEADERS
# include <stddef.h>
#else
# ifdef HAVE_SYS_TYPES_H
# include <sys/types.h> /* For size_t */
# endif
#endif
/* Compatibility with Visual Studio 2013 and older which don't support
the inline keyword in C (only in C++): use __inline instead.
(copied from pythoncapi_compat.h) */
#if (defined(_MSC_VER) && _MSC_VER < 1900 \
&& !defined(__cplusplus) && !defined(inline))
#define inline __inline
#endif
#ifdef _MSC_VER
#include <intrin.h> /* For _byteswap_uint64() */
#endif
/* --- definitions specific to Python --- */
/* Py_UNREACHABLE was introduced in Python 3.7 */
#ifndef Py_UNREACHABLE
#define Py_UNREACHABLE() assert(0)
#endif
/* --- bitarrayobject --- */
/* .ob_size is the buffer size (in bytes), not the number of elements.
The number of elements (bits) is .nbits. */
typedef struct {
PyObject_VAR_HEAD
char *ob_item; /* buffer */
Py_ssize_t allocated; /* allocated buffer size (in bytes) */
Py_ssize_t nbits; /* length of bitarray, i.e. elements */
int endian; /* bit-endianness of bitarray */
int ob_exports; /* how many buffer exports */
PyObject *weakreflist; /* list of weak references */
Py_buffer *buffer; /* used when importing a buffer */
int readonly; /* buffer is readonly */
} bitarrayobject;
/* --- bit-endianness --- */
#define ENDIAN_LITTLE 0
#define ENDIAN_BIG 1
/* default bit-endianness */
#define ENDIAN_DEFAULT ENDIAN_BIG
#define IS_LE(self) ((self)->endian == ENDIAN_LITTLE)
#define IS_BE(self) ((self)->endian == ENDIAN_BIG)
/* endianness as string */
#define ENDIAN_STR(endian) ((endian) == ENDIAN_LITTLE ? "little" : "big")
/* number of pad bits */
#define PADBITS(self) ((8 - (self)->nbits % 8) % 8)
/* number of bytes necessary to store given nunmber of bits */
#define BYTES(bits) (((bits) + 7) >> 3)
/* we're not using bitmask_table here, as it is actually slower */
#define BITMASK(self, i) (((char) 1) << ((self)->endian == ENDIAN_LITTLE ? \
((i) % 8) : (7 - (i) % 8)))
/* buffer as uint64 array */
#define WBUFF(self) ((uint64_t *) (self)->ob_item)
/* assert that .nbits is in agreement with .ob_size */
#define assert_nbits(self) assert(BYTES((self)->nbits) == Py_SIZE(self))
/* assert byte index is in range */
#define assert_byte_in_range(self, j) \
assert(self->ob_item && 0 <= (j) && (j) < Py_SIZE(self))
/* ------------ low level access to bits in bitarrayobject ------------- */
static inline int
getbit(bitarrayobject *self, Py_ssize_t i)
{
assert_nbits(self);
assert(0 <= i && i < self->nbits);
return self->ob_item[i >> 3] & BITMASK(self, i) ? 1 : 0;
}
static inline void
setbit(bitarrayobject *self, Py_ssize_t i, int vi)
{
char *cp, mask;
assert_nbits(self);
assert(0 <= i && i < self->nbits);
assert(self->readonly == 0);
mask = BITMASK(self, i);
cp = self->ob_item + (i >> 3);
if (vi)
*cp |= mask;
else
*cp &= ~mask;
}
static const char bitmask_table[2][8] = {
{0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}, /* little endian */
{0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01}, /* big endian */
};
/* character with n leading ones is: ones_table[endian][n] */
static const char ones_table[2][8] = {
{0x00, 0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f}, /* little endian */
{0x00, 0x80, 0xc0, 0xe0, 0xf0, 0xf8, 0xfc, 0xfe}, /* big endian */
};
/* Return last byte in buffer with pad bits zeroed out.
If the length of the bitarray is a multiple of 8 (which includes an empty
bitarray), 0 is returned. */
static inline char
zlc(bitarrayobject *self) /* zlc = zeroed last char */
{
const int r = self->nbits % 8; /* index into mask table */
if (r == 0)
return 0;
return self->ob_item[Py_SIZE(self) - 1] & ones_table[IS_BE(self)][r];
}
/* Return a uint64_t word representing the last (up to 63) remaining bits
of the buffer. All missing bytes (to complete the word) and padbits are
treated as zeros.
If the length of the bitarray is a multiple of 64 (which also includes
an empty bitarray), 0 is returned. */
static inline uint64_t
zlw(bitarrayobject *self) /* zlw = zeroed last word */
{
const size_t nbits = self->nbits;
const size_t nw = (nbits / 64) * 8; /* bytes in complete words */
const size_t nr = (nbits % 64) / 8; /* complete remaining bytes */
uint64_t res = 0;
assert(nw + nr == nbits / 8 && 8 * (nw + nr) + nbits % 8 == nbits);
memcpy((char *) &res, self->ob_item + nw, nr);
if (nbits % 8)
*(((char *) &res) + nr) = zlc(self);
return res;
}
/* unless buffer is readonly, zero out pad bits - self->nbits is unchanged */
static inline void
set_padbits(bitarrayobject *self)
{
if (self->readonly == 0) {
int r = self->nbits % 8; /* index into mask table */
if (r)
self->ob_item[Py_SIZE(self) - 1] &= ones_table[IS_BE(self)][r];
}
}
/* population count - number of 1's in uint64 */
static inline int
popcnt_64(uint64_t x)
{
#if (defined(__clang__) || defined(__GNUC__))
return __builtin_popcountll(x);
#else
/* https://en.wikipedia.org/wiki/Hamming_weight popcount64c */
const uint64_t m1 = 0x5555555555555555;
const uint64_t m2 = 0x3333333333333333;
const uint64_t m4 = 0x0f0f0f0f0f0f0f0f;
const uint64_t h01 = 0x0101010101010101;
x -= (x >> 1) & m1;
x = (x & m2) + ((x >> 2) & m2);
x = (x + (x >> 4)) & m4;
return (x * h01) >> 56;
#endif
}
static inline int
parity_64(uint64_t x)
{
#if (defined(__clang__) || defined(__GNUC__))
return __builtin_parityll(x);
#else
int i;
for (i = 32; i > 0; i /= 2)
x ^= x >> i;
return x & 1;
#endif
}
static inline uint64_t
builtin_bswap64(uint64_t word)
{
#if (defined(__clang__) || \
(defined(__GNUC__) \
&& ((__GNUC__ >= 5) || (__GNUC__ == 4) && (__GNUC_MINOR__ >= 3))))
/* __builtin_bswap64() is available since GCC 4.3 */
# define HAVE_BUILTIN_BSWAP64 1
return __builtin_bswap64(word);
#elif defined(_MSC_VER)
# define HAVE_BUILTIN_BSWAP64 1
return _byteswap_uint64(word);
#else
# define HAVE_BUILTIN_BSWAP64 0
Py_UNREACHABLE();
#endif
}
/* reverse order of first n bytes of p */
static inline void
swap_bytes(char *p, Py_ssize_t n)
{
Py_ssize_t i, j;
for (i = 0, j = n - 1; i < j; i++, j--) {
char t = p[i];
p[i] = p[j];
p[j] = t;
}
}
/* write 256 characters into table for given kernel operation */
static inline void
setup_table(char *table, char kop)
{
int k;
for (k = 0; k < 256; k++) {
char t = 0, j;
for (j = 0; j < 8; j++) {
if (k & 1 << j) {
/* j are the indices of active bits in k (little endian) */
switch (kop) {
case 'a': t += j; break; /* add active indices */
case 'A': t += 7 - j; break; /* 'a' for big endian */
case 's': t += j * j; /* add squares of active indices */
break;
case 'S': t += (7-j) * (7-j); /* 's' for big endian */
break;
case 'x': t ^= j; break; /* xor active indices */
case 'X': t ^= 7 - j; break; /* 'x' for big endian */
case 'c': t++; break; /* bit count */
case 'p': t ^= 1; break; /* parity */
case 'r': t |= 128 >> j; break; /* reverse bits */
default: Py_UNREACHABLE();
}
}
}
table[k] = t;
}
}
/* Return distance [0..3] to next aligned pointer.
While on modern compilers uint64_t pointers may be misaligned, it may
cause problems on older ones. Moreover, it may lead to slowdown (even
on modern compilers). */
static inline int
to_aligned(void *p)
{
int r = ((uintptr_t) p) % 4;
return (4 - r) % 4;
}
/* population count of n words starting from at uint64_t pointer w */
static inline Py_ssize_t
popcnt_words(uint64_t *w, Py_ssize_t n)
{
Py_ssize_t cnt = 0;
assert(n >= 0 && ((uintptr_t) w) % 4 == 0);
while (n--)
cnt += popcnt_64(*w++);
return cnt;
}
/* Adjust slice parameters such that step is always positive.
This produces simpler loops over elements when their order is irrelevant.
Moreover, for step = -1, we can now use set_span() in set_range() and
count_span() in count_range().
*/
static inline void
adjust_step_positive(Py_ssize_t slicelength,
Py_ssize_t *start, Py_ssize_t *stop, Py_ssize_t *step)
{
if (*step < 0) {
*stop = *start + 1;
*start = *stop + *step * (slicelength - 1) - 1;
*step = -(*step);
}
assert(*start >= 0 && *stop >= 0 && *step > 0 && slicelength >= 0);
/* slicelength == 0 implies stop <= start */
assert(slicelength != 0 || *stop <= *start);
/* step == 1 and slicelength != 0 implies stop - start == slicelength */
assert(*step != 1 || slicelength == 0 || *stop - *start == slicelength);
}
/* convert Python object to C int and set value at address -
return 1 on success, 0 on failure (and set exception) */
static inline int
conv_pybit(PyObject *value, int *vi)
{
Py_ssize_t n;
n = PyNumber_AsSsize_t(value, NULL);
if (n == -1 && PyErr_Occurred())
return 0;
if (n >> 1) {
PyErr_Format(PyExc_ValueError, "bit must be 0 or 1, got %zd", n);
return 0;
}
*vi = (int) n;
return 1;
}
/* Return 0 if bitarrays have equal length and bit-endianness.
Otherwise, set exception and return -1. */
static inline int
ensure_eq_size_endian(bitarrayobject *a, bitarrayobject *b)
{
if (a->nbits != b->nbits) {
PyErr_SetString(PyExc_ValueError,
"bitarrays of equal length expected");
return -1;
}
if (a->endian != b->endian) {
PyErr_SetString(PyExc_ValueError,
"bitarrays of equal bit-endianness expected");
return -1;
}
return 0;
}
/* Equivalent to: import bitarray; return getattr(bitarray, name) */
static inline PyObject *
bitarray_module_attr(char *name)
{
PyObject *bitarray_module, *result;
bitarray_module = PyImport_ImportModule("bitarray");
if (bitarray_module == NULL)
return NULL;
result = PyObject_GetAttrString(bitarray_module, name);
Py_DECREF(bitarray_module);
return result;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,632 @@
# Copyright (c) 2019 - 2025, Ilan Schnell; All Rights Reserved
# bitarray is published under the PSF license.
#
# Author: Ilan Schnell
"""
Useful utilities for working with bitarrays.
"""
import os
import sys
import math
import random
from bitarray import bitarray, bits2bytes
from bitarray._util import (
zeros, ones, count_n, parity, _ssqi, xor_indices,
count_and, count_or, count_xor, any_and, subset,
correspond_all, byteswap,
serialize, deserialize,
ba2hex, hex2ba,
ba2base, base2ba,
sc_encode, sc_decode,
vl_encode, vl_decode,
canonical_decode,
)
__all__ = [
'zeros', 'ones', 'urandom', 'random_k', 'random_p', 'gen_primes',
'pprint', 'strip', 'count_n',
'parity', 'sum_indices', 'xor_indices',
'count_and', 'count_or', 'count_xor', 'any_and', 'subset',
'correspond_all', 'byteswap', 'intervals',
'ba2hex', 'hex2ba',
'ba2base', 'base2ba',
'ba2int', 'int2ba',
'serialize', 'deserialize',
'sc_encode', 'sc_decode',
'vl_encode', 'vl_decode',
'huffman_code', 'canonical_huffman', 'canonical_decode',
]
def urandom(__length, endian=None):
"""urandom(n, /, endian=None) -> bitarray
Return random bitarray of length `n` (uses `os.urandom()`).
"""
a = bitarray(os.urandom(bits2bytes(__length)), endian)
del a[__length:]
return a
def random_k(__n, k, endian=None):
"""random_k(n, /, k, endian=None) -> bitarray
Return (pseudo-) random bitarray of length `n` with `k` elements
set to one. Mathematically equivalent to setting (in a bitarray of
length `n`) all bits at indices `random.sample(range(n), k)` to one.
The random bitarrays are reproducible when giving Python's `random.seed()`
a specific seed value.
"""
r = _Random(__n, endian)
if not isinstance(k, int):
raise TypeError("int expected, got '%s'" % type(k).__name__)
return r.random_k(k)
def random_p(__n, p=0.5, endian=None):
"""random_p(n, /, p=0.5, endian=None) -> bitarray
Return (pseudo-) random bitarray of length `n`, where each bit has
probability `p` of being one (independent of any other bits). Mathematically
equivalent to `bitarray((random() < p for _ in range(n)), endian)`, but much
faster for large `n`. The random bitarrays are reproducible when giving
Python's `random.seed()` with a specific seed value.
This function requires Python 3.12 or higher, as it depends on the standard
library function `random.binomialvariate()`. Raises `NotImplementedError`
when Python version is too low.
"""
if sys.version_info[:2] < (3, 12):
raise NotImplementedError("bitarray.util.random_p() requires "
"Python 3.12 or higher")
r = _Random(__n, endian)
return r.random_p(p)
class _Random:
# The main reason for this class it to enable testing functionality
# individually in the test class Random_P_Tests in 'test_util.py'.
# The test class also contains many comments and explanations.
# To better understand how the algorithm works, see ./doc/random_p.rst
# See also, VerificationTests in devel/test_random.py
# maximal number of calls to .random_half() in .combine()
M = 8
# number of resulting probability intervals
K = 1 << M
# limit for setting individual bits randomly
SMALL_P = 0.01
def __init__(self, n=0, endian=None):
self.n = n
self.nbytes = bits2bytes(n)
self.endian = endian
def random_half(self):
"""
Return bitarray with each bit having probability p = 1/2 of being 1.
"""
nbytes = self.nbytes
# use random module function for reproducibility (not urandom())
b = random.getrandbits(8 * nbytes).to_bytes(nbytes, 'little')
a = bitarray(b, self.endian)
del a[self.n:]
return a
def op_seq(self, i):
"""
Return bitarray containing operator sequence.
Each item represents a bitwise operation: 0: AND 1: OR
After applying the sequence (see .combine_half()), we
obtain a bitarray with probability q = i / K
"""
if not 0 < i < self.K:
raise ValueError("0 < i < %d, got i = %d" % (self.K, i))
# sequence of &, | operations - least significant operations first
a = bitarray(i.to_bytes(2, byteorder="little"), "little")
return a[a.index(1) + 1 : self.M]
def combine_half(self, seq):
"""
Combine random bitarrays with probability 1/2
according to given operator sequence.
"""
a = self.random_half()
for k in seq:
if k:
a |= self.random_half()
else:
a &= self.random_half()
return a
def random_k(self, k):
n = self.n
# error check inputs and handle edge cases
if k <= 0 or k >= n:
if k == 0:
return zeros(n, self.endian)
if k == n:
return ones(n, self.endian)
raise ValueError("k must be in range 0 <= k <= n, got %s" % k)
# exploit symmetry to establish: k <= n // 2
if k > n // 2:
a = self.random_k(n - k)
a.invert() # use in-place to avoid copying
return a
# decide on sequence, see VerificationTests devel/test_random.py
if k < 16 or k * self.K < 3 * n:
i = 0
else:
p = k / n # p <= 0.5
p -= (0.2 - 0.4 * p) / math.sqrt(n)
i = int(p * (self.K + 1))
# combine random bitarrays using bitwise AND and OR operations
if i < 3:
a = zeros(n, self.endian)
diff = -k
else:
a = self.combine_half(self.op_seq(i))
diff = a.count() - k
randrange = random.randrange
if diff < 0: # not enough bits 1 - increase count
for _ in range(-diff):
i = randrange(n)
while a[i]:
i = randrange(n)
a[i] = 1
elif diff > 0: # too many bits 1 - decrease count
for _ in range(diff):
i = randrange(n)
while not a[i]:
i = randrange(n)
a[i] = 0
return a
def random_p(self, p):
# error check inputs and handle edge cases
if p <= 0.0 or p == 0.5 or p >= 1.0:
if p == 0.0:
return zeros(self.n, self.endian)
if p == 0.5:
return self.random_half()
if p == 1.0:
return ones(self.n, self.endian)
raise ValueError("p must be in range 0.0 <= p <= 1.0, got %s" % p)
# for small n, use literal definition
if self.n < 16:
return bitarray((random.random() < p for _ in range(self.n)),
self.endian)
# exploit symmetry to establish: p < 0.5
if p > 0.5:
a = self.random_p(1.0 - p)
a.invert() # use in-place to avoid copying
return a
# for small p, set randomly individual bits
if p < self.SMALL_P:
return self.random_k(random.binomialvariate(self.n, p))
# calculate operator sequence
i = int(p * self.K)
if p * (self.K + 1) > i + 1: # see devel/test_random.py
i += 1
seq = self.op_seq(i)
q = i / self.K
# when n is small compared to number of operations, also use literal
if self.n < 100 and self.nbytes <= len(seq) + 3 * bool(q != p):
return bitarray((random.random() < p for _ in range(self.n)),
self.endian)
# combine random bitarrays using bitwise AND and OR operations
a = self.combine_half(seq)
if q < p:
x = (p - q) / (1.0 - q)
a |= self.random_p(x)
elif q > p:
x = p / q
a &= self.random_p(x)
return a
def gen_primes(__n, endian=None, odd=False):
"""gen_primes(n, /, endian=None, odd=False) -> bitarray
Generate a bitarray of length `n` in which active indices are prime numbers.
By default (`odd=False`), active indices correspond to prime numbers directly.
When `odd=True`, only odd prime numbers are represented in the resulting
bitarray `a`, and `a[i]` corresponds to `2*i+1` being prime or not.
"""
n = int(__n)
if n < 0:
raise ValueError("bitarray length must be >= 0")
if odd:
a = ones(105, endian) # 105 = 3 * 5 * 7
a[1::3] = 0
a[2::5] = 0
a[3::7] = 0
f = "01110110"
else:
a = ones(210, endian) # 210 = 2 * 3 * 5 * 7
for i in 2, 3, 5, 7:
a[::i] = 0
f = "00110101"
# repeating the array many times is faster than setting the multiples
# of the low primes to 0
a *= (n + len(a) - 1) // len(a)
a[:8] = bitarray(f, endian)
del a[n:]
# perform sieve starting at 11
if odd:
for i in a.search(1, 5, int(math.sqrt(n // 2) + 1.0)): # 11//2 = 5
j = 2 * i + 1
a[(j * j) // 2 :: j] = 0
else:
# i*i is always odd, and even bits are already set to 0: use step 2*i
for i in a.search(1, 11, int(math.sqrt(n) + 1.0)):
a[i * i :: 2 * i] = 0
return a
def sum_indices(__a, mode=1):
"""sum_indices(a, /, mode=1) -> int
Return sum of indices of all active bits in bitarray `a`.
Equivalent to `sum(i for i, v in enumerate(a) if v)`.
`mode=2` sums square of indices.
"""
if mode not in (1, 2):
raise ValueError("unexpected mode %r" % mode)
# For details see: devel/test_sum_indices.py
n = 1 << 19 # block size 512 Kbits
if len(__a) <= n: # shortcut for single block
return _ssqi(__a, mode)
# Constants
m = n // 8 # block size in bytes
o1 = n * (n - 1) // 2
o2 = o1 * (2 * n - 1) // 3
nblocks = (len(__a) + n - 1) // n
padbits = __a.padbits
sm = 0
for i in range(nblocks):
# use memoryview to avoid copying memory
v = memoryview(__a)[i * m : (i + 1) * m]
block = bitarray(None, __a.endian, buffer=v)
if padbits and i == nblocks - 1:
if block.readonly:
block = bitarray(block)
block[-padbits:] = 0
k = block.count()
if k:
y = n * i
z1 = o1 if k == n else _ssqi(block)
if mode == 1:
sm += k * y + z1
else:
z2 = o2 if k == n else _ssqi(block, 2)
sm += (k * y + 2 * z1) * y + z2
return sm
def pprint(__a, stream=None, group=8, indent=4, width=80):
"""pprint(bitarray, /, stream=None, group=8, indent=4, width=80)
Pretty-print bitarray object to `stream`, defaults is `sys.stdout`.
By default, bits are grouped in bytes (8 bits), and 64 bits per line.
Non-bitarray objects are printed using `pprint.pprint()`.
"""
if stream is None:
stream = sys.stdout
if not isinstance(__a, bitarray):
import pprint as _pprint
_pprint.pprint(__a, stream=stream, indent=indent, width=width)
return
group = int(group)
if group < 1:
raise ValueError('group must be >= 1')
indent = int(indent)
if indent < 0:
raise ValueError('indent must be >= 0')
width = int(width)
if width <= indent:
raise ValueError('width must be > %d (indent)' % indent)
gpl = (width - indent) // (group + 1) # groups per line
epl = group * gpl # elements per line
if epl == 0:
epl = width - indent - 2
type_name = type(__a).__name__
# here 4 is len("'()'")
multiline = len(type_name) + 4 + len(__a) + len(__a) // group >= width
if multiline:
quotes = "'''"
elif __a:
quotes = "'"
else:
quotes = ""
stream.write("%s(%s" % (type_name, quotes))
for i, b in enumerate(__a):
if multiline and i % epl == 0:
stream.write('\n%s' % (indent * ' '))
if i % group == 0 and i % epl != 0:
stream.write(' ')
stream.write(str(b))
if multiline:
stream.write('\n')
stream.write("%s)\n" % quotes)
stream.flush()
def strip(__a, mode='right'):
"""strip(bitarray, /, mode='right') -> bitarray
Return a new bitarray with zeros stripped from left, right or both ends.
Allowed values for mode are the strings: `left`, `right`, `both`
"""
if not isinstance(mode, str):
raise TypeError("str expected for mode, got '%s'" %
type(__a).__name__)
if mode not in ('left', 'right', 'both'):
raise ValueError("mode must be 'left', 'right' or 'both', got %r" %
mode)
start = None if mode == 'right' else __a.find(1)
if start == -1:
return __a[:0]
stop = None if mode == 'left' else __a.find(1, right=1) + 1
return __a[start:stop]
def intervals(__a):
"""intervals(bitarray, /) -> iterator
Compute all uninterrupted intervals of 1s and 0s, and return an
iterator over tuples `(value, start, stop)`. The intervals are guaranteed
to be in order, and their size is always non-zero (`stop - start > 0`).
"""
try:
value = __a[0] # value of current interval
except IndexError:
return
n = len(__a)
stop = 0 # "previous" stop - becomes next start
while stop < n:
start = stop
# assert __a[start] == value
try: # find next occurrence of opposite value
stop = __a.index(not value, start)
except ValueError:
stop = n
yield int(value), start, stop
value = not value # next interval has opposite value
def ba2int(__a, signed=False):
"""ba2int(bitarray, /, signed=False) -> int
Convert the given bitarray to an integer.
The bit-endianness of the bitarray is respected.
`signed` indicates whether two's complement is used to represent the integer.
"""
if not isinstance(__a, bitarray):
raise TypeError("bitarray expected, got '%s'" % type(__a).__name__)
length = len(__a)
if length == 0:
raise ValueError("non-empty bitarray expected")
if __a.padbits:
pad = zeros(__a.padbits, __a.endian)
__a = __a + pad if __a.endian == "little" else pad + __a
res = int.from_bytes(__a.tobytes(), byteorder=__a.endian)
if signed and res >> length - 1:
res -= 1 << length
return res
def int2ba(__i, length=None, endian=None, signed=False):
"""int2ba(int, /, length=None, endian=None, signed=False) -> bitarray
Convert the given integer to a bitarray (with given bit-endianness,
and no leading (big-endian) / trailing (little-endian) zeros), unless
the `length` of the bitarray is provided. An `OverflowError` is raised
if the integer is not representable with the given number of bits.
`signed` determines whether two's complement is used to represent the integer,
and requires `length` to be provided.
"""
if not isinstance(__i, int):
raise TypeError("int expected, got '%s'" % type(__i).__name__)
if length is not None:
if not isinstance(length, int):
raise TypeError("int expected for argument 'length'")
if length <= 0:
raise ValueError("length must be > 0")
if signed:
if length is None:
raise TypeError("signed requires argument 'length'")
m = 1 << length - 1
if not (-m <= __i < m):
raise OverflowError("signed integer not in range(%d, %d), "
"got %d" % (-m, m, __i))
if __i < 0:
__i += 1 << length
else: # unsigned
if length and __i >> length:
raise OverflowError("unsigned integer not in range(0, %d), "
"got %d" % (1 << length, __i))
a = bitarray(0, endian)
b = __i.to_bytes(bits2bytes(__i.bit_length()), byteorder=a.endian)
a.frombytes(b)
le = a.endian == 'little'
if length is None:
return strip(a, 'right' if le else 'left') if a else a + '0'
if len(a) > length:
return a[:length] if le else a[-length:]
if len(a) == length:
return a
# len(a) < length, we need padding
pad = zeros(length - len(a), a.endian)
return a + pad if le else pad + a
# ------------------------------ Huffman coding -----------------------------
def _huffman_tree(__freq_map):
"""_huffman_tree(dict, /) -> Node
Given a dict mapping symbols to their frequency, construct a Huffman tree
and return its root node.
"""
from heapq import heappush, heappop
class Node(object):
"""
There are to tyes of Node instances (both have 'freq' attribute):
* leaf node: has 'symbol' attribute
* parent node: has 'child' attribute (tuple with both children)
"""
def __lt__(self, other):
# heapq needs to be able to compare the nodes
return self.freq < other.freq
minheap = []
# create all leaf nodes and push them onto the queue
for sym, f in __freq_map.items():
leaf = Node()
leaf.symbol = sym
leaf.freq = f
heappush(minheap, leaf)
# repeat the process until only one node remains
while len(minheap) > 1:
# take the two nodes with lowest frequencies from the queue
# to construct a new parent node and push it onto the queue
parent = Node()
parent.child = heappop(minheap), heappop(minheap)
parent.freq = parent.child[0].freq + parent.child[1].freq
heappush(minheap, parent)
# the single remaining node is the root of the Huffman tree
return minheap[0]
def huffman_code(__freq_map, endian=None):
"""huffman_code(dict, /, endian=None) -> dict
Given a frequency map, a dictionary mapping symbols to their frequency,
calculate the Huffman code, i.e. a dict mapping those symbols to
bitarrays (with given bit-endianness). Note that the symbols are not limited
to being strings. Symbols may be any hashable object.
"""
if not isinstance(__freq_map, dict):
raise TypeError("dict expected, got '%s'" % type(__freq_map).__name__)
if len(__freq_map) < 2:
if len(__freq_map) == 0:
raise ValueError("cannot create Huffman code with no symbols")
# Only one symbol: Normally if only one symbol is given, the code
# could be represented with zero bits. However here, the code should
# be at least one bit for the .encode() and .decode() methods to work.
# So we represent the symbol by a single code of length one, in
# particular one 0 bit. This is an incomplete code, since if a 1 bit
# is received, it has no meaning and will result in an error.
sym = list(__freq_map)[0]
return {sym: bitarray('0', endian)}
result = {}
def traverse(nd, prefix=bitarray(0, endian)):
try: # leaf
result[nd.symbol] = prefix
except AttributeError: # parent, so traverse each child
traverse(nd.child[0], prefix + '0')
traverse(nd.child[1], prefix + '1')
traverse(_huffman_tree(__freq_map))
return result
def canonical_huffman(__freq_map):
"""canonical_huffman(dict, /) -> tuple
Given a frequency map, a dictionary mapping symbols to their frequency,
calculate the canonical Huffman code. Returns a tuple containing:
0. the canonical Huffman code as a dict mapping symbols to bitarrays
1. a list containing the number of symbols of each code length
2. a list of symbols in canonical order
Note: the two lists may be used as input for `canonical_decode()`.
"""
if not isinstance(__freq_map, dict):
raise TypeError("dict expected, got '%s'" % type(__freq_map).__name__)
if len(__freq_map) < 2:
if len(__freq_map) == 0:
raise ValueError("cannot create Huffman code with no symbols")
# Only one symbol: see note above in huffman_code()
sym = list(__freq_map)[0]
return {sym: bitarray('0', 'big')}, [0, 1], [sym]
code_length = {} # map symbols to their code length
def traverse(nd, length=0):
# traverse the Huffman tree, but (unlike in huffman_code() above) we
# now just simply record the length for reaching each symbol
try: # leaf
code_length[nd.symbol] = length
except AttributeError: # parent, so traverse each child
traverse(nd.child[0], length + 1)
traverse(nd.child[1], length + 1)
traverse(_huffman_tree(__freq_map))
# We now have a mapping of symbols to their code length, which is all we
# need to construct a list of tuples (symbol, code length) sorted by
# code length:
table = sorted(code_length.items(), key=lambda item: item[1])
maxbits = table[-1][1]
codedict = {}
count = (maxbits + 1) * [0]
code = 0
for i, (sym, length) in enumerate(table):
codedict[sym] = int2ba(code, length, 'big')
count[length] += 1
if i + 1 < len(table):
code += 1
code <<= table[i + 1][1] - length
return codedict, count, [item[0] for item in table]

View File

@@ -0,0 +1,84 @@
# Copyright (c) 2021 - 2025, Ilan Schnell; All Rights Reserved
from collections import Counter
from collections.abc import Iterable, Iterator, Sequence
from typing import Any, AnyStr, BinaryIO, Optional, Union
from bitarray import bitarray, BytesLike, CodeDict
FreqMap = Union[Counter[int], dict[Any, Union[int, float]]]
def zeros(length: int, endian: Optional[str] = ...) -> bitarray: ...
def ones(length: int, endian: Optional[str] = ...) -> bitarray: ...
def urandom(length: int, endian: Optional[str] = ...) -> bitarray: ...
def random_p(n: int,
p = ...,
endian: Optional[str] = ...) -> bitarray: ...
def random_k(n: int,
k: int,
endian: Optional[str] = ...) -> bitarray: ...
def gen_primes(n: int,
endian: Optional[str] = ...,
odd: Optional[bool] = ...) -> bitarray: ...
def pprint(a: Any, stream: BinaryIO = ...,
group: int = ...,
indent: int = ...,
width: int = ...) -> None: ...
def strip(a: bitarray, mode: str = ...) -> bitarray: ...
def count_n(a: bitarray,
n: int,
value: int = ...) -> int: ...
def parity(a: bitarray) -> int: ...
def sum_indices(a: bitarray, mode: int = ...) -> int: ...
def xor_indices(a: bitarray) -> int: ...
def count_and(a: bitarray, b: bitarray) -> int: ...
def count_or(a: bitarray, b: bitarray) -> int: ...
def count_xor(a: bitarray, b: bitarray) -> int: ...
def any_and(a: bitarray, b: bitarray) -> bool: ...
def subset(a: bitarray, b: bitarray) -> bool: ...
def correspond_all(a: bitarray, b: bitarray) -> tuple: ...
def byteswap(a: BytesLike, n: int) -> None: ...
def intervals(a: bitarray) -> Iterator: ...
def ba2hex(a: bitarray,
group: int = ...,
sep: str = ...) -> str: ...
def hex2ba(s: AnyStr,
endian: Optional[str] = ...) -> bitarray: ...
def ba2base(n: int,
a: bitarray,
group: int = ...,
sep: str = ...) -> str: ...
def base2ba(n: int,
s: AnyStr,
endian: Optional[str] = ...) -> bitarray: ...
def ba2int(a: bitarray, signed: int = ...) -> int: ...
def int2ba(i: int,
length: int = ...,
endian: str = ...,
signed: int = ...) -> bitarray: ...
def serialize(a: bitarray) -> bytes: ...
def deserialize(b: BytesLike) -> bitarray: ...
def sc_encode(a: bitarray) -> bytes: ...
def sc_decode(stream: Iterable[int]) -> bitarray: ...
def vl_encode(a: bitarray) -> bytes: ...
def vl_decode(stream: Iterable[int],
endian: Optional[str] = ...) -> bitarray: ...
def _huffman_tree(freq_map: FreqMap) -> Any: ...
def huffman_code(freq_map: FreqMap,
endian: Optional[str] = ...) -> CodeDict: ...
def canonical_huffman(Freq_Map) -> tuple[CodeDict, list, list]: ...
def canonical_decode(a: bitarray,
count: Sequence[int],
symbol: Iterable[Any]) -> Iterator: ...