viewgit/inc/functions.php:22 Function utf8_encode() is deprecated [8192]
<?php /** * SeekQuarry/Yioop -- * Open Source Pure PHP Search Engine, Crawler, and Indexer * * Copyright (C) 2009 - 2022 Chris Pollett chris@pollett.org * * LICENSE: * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. * * END LICENSE * * @author Chris Pollett chris@pollett.org * @license https://www.gnu.org/licenses/ GPL3 * @link https://www.seekquarry.com/ * @copyright 2009 - 2022 * @filesource */ namespace seekquarry\yioop\library; use seekquarry\yioop\configs as C; /** * Loads crawlLog functions if needed */ require_once __DIR__ . "/Utility.php"; /** * This class implements a linear hash table for storing records that use * PackedTableTools for their format * * @author Chris Pollett */ class LinearHashTable { /** * Name of staging partition used for rows before they are added to the * main linear hash table */ const ACTIVE_INDEX = "active"; /** * Compression strategy used to compress blob and serial columns */ const DEFAULT_COMPRESSOR = C\NS_COMPRESSORS . "NonCompressor"; /** * Default parameters to use when constructing a LinearHashTable */ const DEFAULT_PARAMETERS = ["COMPRESSOR" => self::DEFAULT_COMPRESSOR, "COUNT" => 0, "PARTITION_SIZE_THRESHOLD" => self::PARTITION_SIZE_THRESHOLD, "FORMAT" => ["PRIMARY KEY" => "KEY", "VALUE" => "BLOB"], "MAX_ITEMS_PER_FILE" => self::MAX_ITEMS_PER_FILE, "SAVE_PARTITION" => 0, "ACTIVE_COUNT" => 0 ]; /** * File extension to use for hash index files for partitions * These are the file buckets which have names based on portions of hash * key values and store records for keys matching the bit pattern */ const HASH_INDEX_EXTENSION = ".hix"; /** * Maximum number of indexes to partitions to cache for reads */ const INDEX_CACHE_SIZE = 100; /** * For each archive partition, the file with the list of keys (not hashes * of keys) that were added for this partition will be prefixed this string. */ const KEY_PREFIX = "key_"; /** * Default maximum number of records to store in a hash table partition */ const MAX_ITEMS_PER_FILE = 16384; /** * File name of file used to store the parameters of this * LinearHashTable */ const PARAMETERS_FILE = "lht_parameters.txt"; /** * Prefix to file names of LinearHashTable partition files */ const PARTITION_PREFIX = "partition_"; /** * Maximum number of bytes a partition can have before a split. * Notice this implies a maximum file size to store * in BLOB columns */ const PARTITION_SIZE_THRESHOLD = 2147483648; /** * In memory copy of the active partition index. key => packed_row pairs * for the active partition. * @var index */ public $active_index; /** * Field variable used as a cache for the file handle, file name, and * time of the last archive file for a partition added to * @var array */ public $add_archive_cache = [null, "", -1]; /** * Field variable used as a cache for the file handle, file name, and * time of the last key index file for a partition added to * (the key index file is just a sorted file of the keys that appear * in a hash table partition). Its intended use is to aid in * the creation of additional index structures on top of the linear * hash table * @var array */ public $add_key_archive_cache = [null, "", -1]; /** * Field variable used as a cache for the file handle, file name, and * time of the last index hash bucket file was added to * @var array */ public $add_index_cache = ["", [], -1]; /** * Field variable used as a cache for the file handle, file name, and * time of the last archive file for a partition accessed for a value * @var array */ public $get_archive_cache = [null, "", -1]; /** * Array of column names for the columns in a LinearHashTable which * are of type BLOB or SERIAL * @var array */ public $blob_columns; /** * The seekquarry\yioop\library\compressors\Compressor object used to * compress record files and blob items. * @var object */ public $compressor; /** * Folder path where the LinearHashTable is stored * @var string */ public $folder; /** * Used to cached hash_buck_path => contents_of_hash_bucket pairs in * to speed up retrieval and update. * @var array */ public $index_buffer; /** * Used to keep track of when this instance was created, as part of managing * file handles expiration (could be set/updated externally to reflect * some other instance using the table) * @var int */ public $instance_time; /** * Name of primary key column for records * @var string */ public $key_field; /** * Stores the constructor parameters used to create this * LinearHashTable * @var array */ public $parameters; /** * Array of column names for the columns in a PartitionDocumentBundle which * are of type SERIAL * @var array */ public $serial_columns; /** * The PackedTableTools object used to pack and unpack records in * LinearHashTable * @var object */ public $table_tools; /** * Creates/Loads LinearHashTable having specified folder and parameters * * @param string $folder is the folder for storing the LinearHashTable files * @param array $format the column names, keys and types for this * LinearHashTable object * @param int $max_items_per_file maximum number of items to store * in a partition before making the next partition * @param int $partition_size_threshold maximum length of a partition * file in bytes before a new partition file should be started * @param object $compressor_type * seekquarry\yioop\library\compressors\Compressor object used to * compress index files and blob items. * @param bool $with_key_archive whether to create and add to a key archive * when doing inserts */ public function __construct($folder, $format = self::DEFAULT_PARAMETERS["FORMAT"], $max_items_per_file = self::MAX_ITEMS_PER_FILE, $partition_size_threshold = self::PARTITION_SIZE_THRESHOLD, $compressor_type = self::DEFAULT_COMPRESSOR, $with_key_archive = false) { $initial_parameters = self::DEFAULT_PARAMETERS; $initial_parameters["PARTITION_SIZE_THRESHOLD"] = $partition_size_threshold; $initial_parameters["MAX_ITEMS_PER_FILE"] = $max_items_per_file; $initial_parameters["COMPRESSOR"] = $compressor_type; $this->compressor = new $compressor_type(); $initial_parameters["FORMAT"] = $format; $initial_parameters["HAS_KEY_ARCHIVE"]= $with_key_archive; $this->instance_time = hrtime(true); $this->folder = $folder; $folder_paths = [$folder]; $changed_parameters = false; foreach ($folder_paths as $folder_path) { if (!file_exists($folder_path)) { $changed_parameters = true; if(!mkdir($folder_path)) { return null; } } } $this->parameters = self::getParameterInfo($folder); foreach (self::DEFAULT_PARAMETERS as $field => $value) { if (!isset($this->parameters[$field])) { $this->parameters[$field] = $initial_parameters[$field]; $changed_parameters = true; } } $format = $this->parameters["FORMAT"]; $packed_table_format = []; $this->blob_columns = []; $this->serial_columns = []; $this->index_buffer = []; $this->key_field = null; foreach ($format as $field_name => $type) { $upper_field_name = strtoupper($field_name); $upper_type = strtoupper($type); if ($upper_field_name == "PRIMARY KEY") { $packed_table_format["PRIMARY KEY"] = $type; $this->key_field = $type; } else if (in_array($upper_type, ["BLOB", "SERIAL"])) { $this->blob_columns[] = $field_name; if ($upper_type == "SERIAL") { $this->serial_columns[] = $field_name; } $packed_table_format[$field_name] = "INT"; } else if (isset(PackedTableTools::TYPE_SYNONYMS[$upper_type])) { $packed_table_format[$field_name] = PackedTableTools::TYPE_SYNONYMS[$upper_type]; } else { return null; } } if (empty($this->key_field)) { return null; } if (!empty($this->blob_columns)) { $packed_table_format["LAST_BLOB_LEN"] = "INT"; $packed_table_format["PARTITION"] = "INT"; } $this->table_tools = new PackedTableTools($packed_table_format); if ($changed_parameters) { $this->saveParameters(); } } /** * Returns the record associated with the given $key in the * LinearHashTable * * @param string $key a key to look up * @param array $fields which fields (columns) from the record to return * @param bool $is_hash_key has the supplied $key already been hash using * the LinearHashTable's hash function (the function used to determine * the partition a key should be in) * @return mixed array containing unpacked record associated with $key if * it exists in the table, false otherwise */ public function get($key, $fields = [], $is_hash_key = false) { $hash_key = ($is_hash_key) ? $key : $this->hashKey($key); $table_tools = $this->table_tools; $active_info = $this->getIndexInfo(self::ACTIVE_INDEX); $index_data_raw = false; if (!empty($active_info) && !empty($active_info[1])) { list($index_path, $index) = $active_info; $index_data_raw = $table_tools->find($index, $hash_key); } if (empty($index_data_raw)) { $index_info = $this->getIndexInfo($hash_key); if (empty($index_info) || empty($index_info[1])) { return false; } list($index_path, $index) = $index_info; $index_data_raw = $table_tools->find($index, $hash_key); } if (empty($index_data_raw)) { return false; } if (!empty($fields)) { if (isset($fields[0]) && $fields == array_values($fields)) { $fields = array_combine($fields, $fields); } } $index_data = $table_tools->unpack($index_data_raw)[0]; if (!empty($this->blob_columns)) { $num_blob_columns = count($this->blob_columns); $offset = intval($index_data[$this->blob_columns[0]]); $partition = $index_data["PARTITION"]; $partition_filename = $this->getPartition($partition); for ($i = 0; $i < $num_blob_columns; $i++) { $column_name = $this->blob_columns[$i]; $len = ($i + 1 < $num_blob_columns) ? intval($index_data[$this->blob_columns[$i + 1]]) : $index_data["LAST_BLOB_LEN"]; if (empty($fields) || isset($fields[$column_name])) { $index_data[$column_name] = ($len == 0) ? "" : $this->getArchive($partition_filename, $offset, $len); } $offset += $len; } unset($index_data["LAST_BLOB_LEN"], $index_data["PARTITION"]); } if (empty($fields)) { foreach ($this->serial_columns as $field_name) { $index_data[$field_name] = unserialize( $index_data[$field_name]); } $index_data[$this->key_field] = $key; $out_data = $index_data; } else { $out_data = []; if (isset($fields[$this->key_field])) { $out_data[$fields[$this->key_field]] = $key; unset($fields[$this->key_field]); } foreach ($fields as $in_name => $out_name) { if (isset($index_data[$in_name])) { if (in_array($in_name, $this->serial_columns)) { $out_data[$out_name] = unserialize( $index_data[$in_name]); } else { $out_data[$out_name] = $index_data[$in_name]; } } } } return $out_data; } /** * Returns the blob item from $archive_filename at $offset of length $len, * uncompress the result * * @param string $archive_filename path to an archive node file for this * LinearHashTable * @param int $offset byte offset into archive node file file * @param int $len length of blob item * @return string uncompressed blob item from $archive_filename */ public function getArchive($archive_filename, $offset, $len) { list($fh, $previous_archive_filename, $previous_instance_time) = $this->get_archive_cache; $compressor = $this->compressor; if (!$fh || $previous_archive_filename != $archive_filename || $previous_instance_time != $this->instance_time) { if ($fh) { fclose($fh); } $fh = fopen($archive_filename , "r"); $previous_archive_filename = $archive_filename; $previous_instance_time = $this->instance_time; } $value = false; if (fseek($fh, $offset) == 0) { $compressed_file = fread($fh, $len); $value = $compressor->uncompress($compressed_file); } $this->get_archive_cache = [$fh, $previous_archive_filename, $previous_instance_time]; return $value; } /** * Returns the path to the archive file for the partition $i * (file used to store blob and serial columns) * * @return string path to partition $i archive file */ public function getPartition($i) { return $this->folder . "/" . self::PARTITION_PREFIX . $i . $this->compressor->fileExtension(); } /** * Returns the path to the key index file for the partition $i * (file used to store blob and serial columns) * * @return string path to partition $i key index file */ public function getKeyPartition($i) { return $this->folder . "/" . self::KEY_PREFIX . $i . self::HASH_INDEX_EXTENSION; } /** * Returns the path to the index file for the active partition (the * key value pairs not yet stored in the main LinearHashTable) * * @return string path to active partition */ public function getActiveIndex() { return $this->folder . "/" . self::ACTIVE_INDEX . self::HASH_INDEX_EXTENSION; } /** * Checks if a key exists in the linear hash table * @param string $key key to check * @param bool $compute_hash whether the key has already had the linear * hash table's hash function applied or not * @param bool $check_active whether to check the active index of * buffered key values that have not yet been put into the main table * @return bool whether the $key exists in the linear hash table */ public function exists($key, $compute_hash = true, $check_active = true) { if ($compute_hash) { $hash_key = $this->hashKey($key); } else { $hash_key = $key; } $table_tools = $this->table_tools; if ($check_active) { $active_info = $this->getIndexInfo(self::ACTIVE_INDEX); if (!empty($active_info) && !empty($active_info[1])) { list($index_path, $index) = $active_info; $row = $table_tools->find($index, $hash_key); if ($row != null) { return true; } } } list($hash_path, $index) = $this->getIndexInfo($hash_key); if ($index) { $row = $table_tools->find($index, $hash_key); if ($row != null) { return true; } } return false; } /** * Used to add a new record or an array of new records to the * LinearHashTable * * @param array $row_or_rows either array of record with fields given * by this LinearHashTable's signature or an array of rows. * @param bool $is_hash_key whether or not the key_field in the row or rows * has already been hash to a string suitable for storage in this * LinearHashTable * @param bool $allow_duplicates whether to allow multiple records with * same key or not. If allow then a find will return the most recent. * @return bool success (true) or not (false) for the insert */ public function put($row_or_rows, $is_hash_key = false, $allow_duplicates = true) { $rows = (empty($row_or_rows[$this->key_field])) ? $row_or_rows : [$row_or_rows]; $table_tools = $this->table_tools; $num_rows = count($rows); $i = 0; foreach ($rows as $row) { crawlTimeoutLog("..still adding pages linear hash table. Added %s" . " of %s.", $i, $num_rows); $key = $row[$this->key_field] ?? false; if ($key === false) { crawlLog("LinearHashTable Put Failed A"); return false; } unset($row[$this->key_field]); $value = $row; $hash_key = ($is_hash_key) ? $key : $this->hashKey($key); if (!$allow_duplicates && $this->exists($hash_key, false)) { crawlLog("LinearHashTable Put Failed B"); return false; } $save_partition = $this->parameters["SAVE_PARTITION"]; $save_partition_name = $this->getPartition($save_partition); $active_index_name = $this->getActiveIndex(); if (($this->parameters['ACTIVE_COUNT'] > $this->parameters["MAX_ITEMS_PER_FILE"]) || ( file_exists($save_partition_name) && filesize($save_partition_name) > $this->parameters["PARTITION_SIZE_THRESHOLD"])) { $this->advanceSavePartition(); } $blob_columns = $this->blob_columns ?? []; $serial_columns = $this->serial_columns ?? []; $format = $this->parameters["FORMAT"]; unset($format['PRIMARY KEY']); $format_keys = array_keys($format); $out_value = []; foreach ($format_keys as $format_key) { $out_value[$format_key] = $value[$format_key] ?? null; } if (!empty($blob_columns)) { $old_offset = 0; $first = true; $has_key_archive = $this->parameters["HAS_KEY_ARCHIVE"] ?? false; foreach ($blob_columns as $blob_column) { $blob = $out_value[$blob_column] ?? ""; if (in_array($blob_column, $serial_columns)) { $blob = serialize($blob); } if (($add_info = $this->addArchive($blob)) === false) { crawlLog("LinearHashTable Put Failed C"); return false; } list($offset, $len, $partition) = $add_info; if ($first && $has_key_archive) { $this->addKeyArchive($key); $first = false; } $out_value[$blob_column] = $offset - $old_offset; $old_offset = $offset; } $out_value["LAST_BLOB_LEN"] = $len; $out_value["PARTITION"] = $partition; } $out_value = $table_tools->pack($out_value); if (empty($this->save_partition_handle) || !is_resource( $this->save_partition_handle)) { $this->save_partition_handle = fopen($active_index_name, "c+"); fseek($this->save_partition_handle, 0, SEEK_END); } $this->active_index = []; /* the above forces getIndexInfo to reload active index on a look up */ if ($table_tools->add($this->save_partition_handle, $hash_key, $out_value, PackedTableTools::ADD_FILE_HANDLE)) { $this->parameters['ACTIVE_COUNT']++; } else { crawlLog("LinearHashTable Put Failed D"); return false; } $i++; } $this->saveParameters(); return true; } /** * Save the active partition to $new_save_partition and advances the * next save partition to be $new_save_partition + 1. If * $new_save_partition ==0, $this->parameters["SAVE_PARTITION"] + 1 is * used for $new_save_partition . If * $new_save_partition <= $this->parameters["SAVE_PARTITION"], then * this function returns false. Saving the active partition means * moving key => record entries from the active index to their appropriate * linear hash table buckets. The Archive file for the save partition is * closed, and a new file handle for the next save partition file is opened. * * @param int $new_save_partition index of the save partition when starting * a new one. * @return bool success (true) or failure (false) */ public function advanceSavePartition($new_save_partition = 0) { $save_partition = $this->parameters["SAVE_PARTITION"]; $new_save_partition = (($new_save_partition) > 0) ? $new_save_partition : $save_partition + 1; if ($new_save_partition <= $save_partition) { return false; } $active_index_name = $this->getActiveIndex(); $active_index_info = $this->getIndexInfo(self::ACTIVE_INDEX); if (!empty($active_index_info) && !empty($active_index_info[1])) { list(, $active_index) = $active_index_info; foreach ($active_index as $active_key => $active_row) { $this->putIndex($active_key, $active_row, true, 0, true); } $this->addIndex("", ""); } if (!empty($this->save_partition_handle) && is_resource($this->save_partition_handle)) { fclose($this->save_partition_handle); } if (file_exists($active_index_name)) { unlink($active_index_name); } $save_partition = $new_save_partition; $this->parameters["SAVE_PARTITION"] = $save_partition; $this->parameters['ACTIVE_COUNT'] = 0; return true; } /** * Deletes a key or set of keys and their associated values from the * linear hash table * @param mixed $key_or_keys either the string of a key or an array of * strings of keys to delete * @param bool $is_hash_key whether or not the key or keys have already * been hash using the linear hash table's hash function * @return bool success (true) or failure (false) for deletion */ public function delete($key_or_keys, $is_hash_key = false) { $keys = (is_array($key_or_keys)) ? $key_or_keys : [$key_or_keys]; $table_tools = $this->table_tools; foreach ($keys as $key) { $hash_key = ($is_hash_key) ? $key : $this->hashKey($key); $active_info = $this->getIndexInfo(self::ACTIVE_INDEX); $is_active = false; $index_data_raw = false; $found = false; if (!empty($active_info) && !empty($active_info[1])) { list($hash_path, $index) = $active_info; $index_data_row = $table_tools->find($index, $hash_key); } if (!empty($index_data_row)) { if (!empty($this->save_partition_handle) && is_resource($this->save_partition_handle)) { fclose($this->save_partition_handle); } $is_active = true; } else { $index_info = $this->getIndexInfo($hash_key); if (empty($index_info) || empty($index_info[1])) { return false; } list($hash_path, $index) = $index_info; if ($index) { $index_data_row = $table_tools->find($index, $hash_key); } if (empty($index_data_row)) { return false; } } $table_tools->delete($index, $hash_key); if (empty($index)) { $this->unlinkHashPath($hash_path); } else { $table_tools->save($hash_path . self::HASH_INDEX_EXTENSION, $index); } if ($is_active) { $this->active_index = $index; $this->parameters["SAVE_PARTITION"]--; } else { $this->index_buffer[$hash_path] = $index; $max_items_per_file = $this->parameters["MAX_ITEMS_PER_FILE"]; if ($this->checkSplitMerge( $this->parameters['COUNT'] - 1) < 0) { $this->mergeMigrate(); } $this->parameters['COUNT']--; } } $this->saveParameters(); return true; } /** * Computes the fixed length hash value of a $key for the purposes of * storing the key associated with a value in this lienar hash table. * @param string $key to hash * @return string fixed length result of hashing */ public function hashKey($key) { return md5($key, true); } /** * Gets the file path for the bucket associated with $hash_key, * assuming the number of items in the linear hash table is $count, * $max_items_per_file is the maximum number of items per file. * * @param string $hash_key LHT hash of a key value * @param int number of items to assume in table (defaults to the number * currently stored) * @param int $max_items_per_file maximum number of items per bucket * (defaults to this parameter for the current LHT) * @param bool $mkdir_if_not_exists make folders along path that don't * exist already if true. * @return string path to bucket */ public function getHashPath($hash_key, $count = -1, $max_items_per_file = -1, $mkdir_if_not_exists = false) { static $old_hash_key = null; static $old_count = -1; static $old_max_items_per_file = -1; static $old_mkdir_if_not_exists = false; static $old_path = ""; if ($old_hash_key == $hash_key && $old_count == $count && $old_max_items_per_file == $max_items_per_file && (!$mkdir_if_not_exists || $old_mkdir_if_not_exists == $mkdir_if_not_exists)) { return $old_path; } $old_hash_key = $hash_key; $old_count = $count; $old_max_items_per_file = $max_items_per_file; $old_mkdir_if_not_exists = $mkdir_if_not_exists; $pack_hash = str_pad(substr($hash_key, 0, 8), 8, "\0", STR_PAD_LEFT); $hash_int = unpack("J", $pack_hash)[1]; $count = ($count == -1) ? $this->parameters['COUNT'] : $count; $max_items_per_file = max(1, ($max_items_per_file == -1) ? $this->parameters['MAX_ITEMS_PER_FILE'] : $max_items_per_file); list($num_files, $max_num_bits, $pow_max, $threshold) = $this->bitStatistics($count, $max_items_per_file); $masked_int = ($hash_int & ($pow_max - 1)); $num_bits = (($masked_int < $threshold)) ? $max_num_bits : $max_num_bits - 1; $mask = ($num_bits == 0) ? 0 : (1 << ($num_bits)) - 1; $init_prefix = ""; if ($num_files == 1 && $max_items_per_file != 1) { $init_prefix = "z"; } $prefix = $this->folder ?? "."; for ($i = 56, $j = 0; $i > 0; $i -= 8, $j++) { if ($num_bits > $i) { $prefix .= "/" . ord($pack_hash[$j]); if ($mkdir_if_not_exists && !file_exists($prefix) ) { mkdir($prefix); } } } $old_path = $prefix . "/" . $init_prefix . sprintf("%'.0{$num_bits}b",($hash_int & $mask)); return $old_path; } /** * Given an item $count, and a maximum number of items in a bucket for the * LHT determines, the number of buckets, the maximum number of bits * to be used from the hash function to determine bucket, 2^max_bits -1, * and a threshold on when to use this number of bit versus one less. * @param int $count number of items in LHT * @param int $max_items_per_file max items per bucket * @return array 4-tuple [num files, max_num_bit, 2^max_bits -1, threshold] */ public function bitStatistics($count, $max_items_per_file) { $num_files = max(1, ceil($count/$max_items_per_file)); $max_num_bits = ceil(log($num_files + 1, 2)); $pow_max = 1 << ($max_num_bits - 1); $threshold = $num_files - $pow_max; return [$num_files, $max_num_bits, $pow_max, $threshold]; } /** * Creates a new counter $field to be maintained * * @param string $field field of info struct to add a counter for */ public function initCountIfNotExists($field = "COUNT") { $this->parameters[$field] ??= 0; $this->saveParameters(); } /** * Add $num to maintained counter $field * * @param int $num number of items to add to current count * @param string $field field of info struct to add to the count of */ public function addCount($num, $field = "COUNT") { $this->parameters[$field] ??= 0; $this->parameters[$field] += $num; $this->saveParameters(); } /** * Save the operating parameters of this LinearHashTable */ public function saveParameters() { $parameter_path = $this->folder . "/" . self::PARAMETERS_FILE; file_put_contents($parameter_path, serialize($this->parameters), LOCK_EX); } /** * Returns the parameters (such as its signature, max number of * documents per partition and counts) used to configure the * LinearHashTable stored at $folder */ public static function getParameterInfo($folder) { $parameter_path = $folder . "/" . self::PARAMETERS_FILE; if(file_exists($parameter_path)) { return unserialize(file_get_contents($parameter_path)); } else { return []; } } /** * Assuming a $value record where blob items have already been replaced by * their integer offsets into the archive partition, inserts in the LHT * a $value record according to this LHT's signature for $key. * * @param string $key key to use for insert * @param string $value record accordin to this LHT's signature * @param bool $is_hash_key whether or not $key has already been hash using * this LHT's hash function * @param int $change_count for much the size of the LHT should change by * @param bool $bulk_insert whether the current insert is one of many * (affects how long items are kept in memory flushing) * @return bool success of insert (true) or failure (false) */ protected function putIndex($key, $value, $is_hash_key = false, $change_count = 0, $bulk_insert = false) { $hash_key = ($is_hash_key) ? $key : $this->hashKey($key); $split = false; if ($change_count == 0) { $count = $this->parameters['COUNT']; if ($count > 1 && $this->checkSplitMerge($count + 1) > 0) { $this->addIndex("", ""); $this->splitMigrate(); } $this->parameters['COUNT']++; $count = $this->parameters['COUNT']; } else { $count = $change_count; } //the next line actually creates directories if needed $hash_path = $this->getHashPath($hash_key, $count, -1, true); if ($this->addIndex($hash_key, $value, $count, $bulk_insert)) { return true; } return false; } /** * Handles merging two buckets into one, which might happen after a delete * on the LHT */ protected function mergeMigrate() { list(,$migrate_to_path_low, $migrate_to_path_high) = $this->computeMigratePaths(); $new_count = $this->parameters['COUNT'] - 1; $this->insertRecordsFromIndex($migrate_to_path_low, $new_count); $this->insertRecordsFromIndex($migrate_to_path_high, $new_count); $this->unlinkHashPath($migrate_to_path_low); $this->unlinkHashPath($migrate_to_path_high); } /** * Handles splitting one bucket into two, which might happen after an * insert on the LHT */ protected function splitMigrate() { list($migrate_from_path,) = $this->computeMigratePaths(); $this->insertRecordsFromIndex($migrate_from_path); $this->unlinkHashPath($migrate_from_path); } /** * Unlinks the index file at the given $hash_path if it exists * @param string $hash_path path to index file */ protected function unlinkHashPath($hash_path) { $index_file_name = $hash_path . self::HASH_INDEX_EXTENSION; if (file_exists($index_file_name)) { unlink($index_file_name); } } /** * Given $count items in LHT with bucket size $max_items_per_file, and * assuming a split/merge just occurred, returns the path of the * two new buckets if a split, and the one new merged bucket if a merge * * @param int $count number of items in LHT * @param int $max_items_per_file maximum number of items/bucket * @return array [path to merge bucket, path to split bucket low, * path to split bucket high] */ protected function computeMigratePaths($count = -1, $max_items_per_file = -1) { $count = ($count == -1) ? $this->parameters['COUNT'] : $count; $max_items_per_file = max(1, ($max_items_per_file == -1) ? $this->parameters['MAX_ITEMS_PER_FILE'] : $max_items_per_file); list($num_files, $max_num_bits, $pow_max, $threshold) = $this->bitStatistics($count, $max_items_per_file); $migrate_from_path = ($threshold >= 0) ? $this->getHashPath(pack("J", $threshold), $count, $max_items_per_file) : false; $migrate_to_path_high = $this->getHashPath(pack("J", $pow_max + $threshold - 1), $count + 1, $max_items_per_file); if ($threshold == 0) { $migrate_to_path_low = $this->getHashPath( pack("J", ($pow_max -1) >>1), $count + 1, $max_items_per_file); } else { $migrate_to_path_low = $this->getHashPath(pack("J", $threshold - 1), $count + 1, $max_items_per_file); } return [$migrate_from_path, $migrate_to_path_low, $migrate_to_path_high]; } /** * Reads in index records from $hash_path, then reinserts them into LHT * under the assumption that the size of the LHT is $new_count * (defaults to current size + 1) * @param string $hash_path location of bucket to read records from * @param int $new_count size of LHT to assume when reinserting records */ protected function insertRecordsFromIndex($hash_path, $new_count = -1) { $new_count = ($new_count == -1) ? $this->parameters['COUNT'] + 1 : $new_count; $index_path = $hash_path . self::HASH_INDEX_EXTENSION; $table_tools = $this->table_tools; $index = (!empty($this->index_buffer[$hash_path])) ? $this->index_buffer[$hash_path] : $table_tools->load($index_path); unset($this->index_buffer[$hash_path]); if (!empty($index)) { foreach ($index as $hash_key => $value) { $hash_path = $this->getHashPath($hash_key, $new_count, -1, true); $this->addIndex($hash_key, $value, $new_count, true); } $this->addIndex("", ""); } } /** * Add a $value to save partition of this linear hash table * @param string $value value to add * @return array [offset into archive, length data saved, * index of partition] */ protected function addArchive($value) { list($fh, $previous_partition_filename, $previous_instance_time) = $this->add_archive_cache; $compressor = $this->compressor; $save_partition = $this->parameters["SAVE_PARTITION"]; $partition_filename = $this->getPartition($save_partition); if (!is_resource($fh) || $partition_filename != $previous_partition_filename || $previous_instance_time != $this->instance_time) { if (!empty($fh) && is_resource($fh)) { fclose($fh); } $fh = fopen($partition_filename , "c+"); $previous_partition_filename = $partition_filename; $previous_instance_time = $this->instance_time; } fseek($fh, 0, SEEK_END); $offset = ftell($fh); $compressed_value = $compressor->compress($value); $len = strlen($compressed_value); fwrite($fh, $compressed_value, $len); $this->add_archive_cache = [$fh, $previous_partition_filename, $previous_instance_time]; return [$offset, $len, $save_partition]; } /** * Add a key (not its hash) to the archive file for keys for the * current save partition. This is used to keep track of all the keys * stored in a partition. Note a key archive is only created if the * constructor $with_key_archive parameter was called with value true * @param string $key key to add */ protected function addKeyArchive($key) { list($fh, $previous_key_filename, $previous_instance_time) = $this->add_key_archive_cache; $save_partition = $this->parameters["SAVE_PARTITION"]; $key_filename = $this->getKeyPartition($save_partition); $separator = "\xFF"; if (!is_resource($fh) || $key_filename != $previous_key_filename || $previous_instance_time != $this->instance_time) { if (is_resource($fh)) { fclose($fh); }; if (!file_exists($key_filename)) { $separator = ""; } $fh = fopen($key_filename , "c+"); $previous_key_filename = $key_filename; $previous_instance_time = $this->instance_time; } fseek($fh, 0, SEEK_END); $encode = $separator . encode255($key); $this->add_key_archive_cache = [$fh, $previous_key_filename, $previous_instance_time]; fwrite($fh, $encode, strlen($encode)); } /** * Adds the $hash_key, $value pair to the linear hash table bucket * (index file) for $hash_key under the assumption that $count items are * in the hash table. If bulk_mode is enabled then the index file * is kept in memory rather than immediately flushed to disk. * * @param string $hash_key key to determine hash table bucket (index file) * @param string $value packed table data to associate with key * @param int $count number of items to assume in linear hash table. * If -1 then use based on the saved parameter count * @param bool $bulk_mode whether to immediately flush index/bucket to disk * @return bool success (true) or failure (false) of addition */ protected function addIndex($hash_key, $value, $count = -1, $bulk_mode = false) { list($previous_hash_path, $previous_index, $previous_instance_time) = $this->add_index_cache; $hash_path = $this->getHashPath($hash_key, $count); $table_tools = $this->table_tools; if (!empty($previous_hash_path)) { if ($previous_hash_path == $hash_path && $hash_key !== "" && $previous_instance_time == $this->instance_time) { $index = $previous_index; } else { $table_tools->save($previous_hash_path . self::HASH_INDEX_EXTENSION, $previous_index); $previous_index = []; $previous_hash_path = ""; $previous_instance_time = $this->instance_time; } } if (empty($hash_key)) { // this case used to flush old value $this->add_index_cache = [$previous_hash_path, $previous_index, $previous_instance_time]; return true; } if (empty($index)) { $index_info = $this->getIndexInfo($hash_key, $count); list($hash_path, $index) = $index_info; } if ($index) { $hash_key_info = $table_tools->find($index, $hash_key); } else { $index = []; $hash_key_info = null; } if (!$hash_key_info) { $table_tools->add($index, $hash_key, $value); if ($bulk_mode) { $previous_index = $index; $previous_hash_path = $hash_path; } else { $table_tools->save($hash_path . self::HASH_INDEX_EXTENSION, $index); $previous_index = []; $previous_hash_path = ""; } $this->index_buffer[$hash_path] = $index; $this->add_index_cache = [$previous_hash_path, $previous_index, $previous_instance_time]; return true; } $this->add_index_cache = [$previous_hash_path, $previous_index, $previous_instance_time]; return false; } /** * Computes the path to the hash partition that would contain $hash_key * as well as the contents of that partition * * @param string $hash_key key to find partition for * @param int $count number of items assumed to be stored in table, if -1 * uses $this->parameters['COUNT'] * @return array [hash_partition_path, hash_partition_contents] if * $count is not -1, then the partition might not exists in which case * the second component might be -1 */ protected function getIndexInfo($hash_key, $count = -1) { if ($hash_key == self::ACTIVE_INDEX) { $index_path = $this->getActiveIndex(); $hash_path = substr($index_path, 0, -4); if (!empty($this->active_index)) { return [$hash_path, $this->active_index]; } } else { $count = ($count == -1) ? $this->parameters['COUNT'] : $count; $hash_path = $this->getHashPath($hash_key, $count); $index_path = $hash_path . self::HASH_INDEX_EXTENSION; if (!empty($this->index_buffer[$hash_path])) { return [$hash_path, $this->index_buffer[$hash_path]]; } } if (!file_exists($index_path)) { return [$hash_path, false]; } $index_data = $this->table_tools->load($index_path); if ($hash_key == self::ACTIVE_INDEX) { $this->active_index = $index_data; return [$hash_path, $this->active_index]; } else { if (count($this->index_buffer) > self::INDEX_CACHE_SIZE) { $this->index_buffer = []; } $this->index_buffer[$hash_path] = $index_data; } return [$hash_path, $index_data]; } /** * Checks if $new_count many stored items would entail splitting * one of the hash partitions of this linear hash table (> 0), * require no change in the number of hash partitions (0), or * require two partitions to be merges (< 0) * @param int $new_count new item count * @return int change (<0 merge, >0split) or no change required (0). */ protected function checkSplitMerge($new_count) { $count = $this->parameters['COUNT']; $max_items_per_file = $this->parameters['MAX_ITEMS_PER_FILE']; $current_expected_num_files = ceil($count/$max_items_per_file); $next_expected_num_files = ceil($new_count/$max_items_per_file); return $next_expected_num_files - $current_expected_num_files; } }