简介
该改造主要针对用sysbench压测超过4列的大宽表需求设计
一、安装sysbench
curl -s https://packagecloud.io/install/repositories/akopytov/sysbench/script.rpm.sh | sudo bash
yum -y install sysbench
sysbench --version
sysbench 1.0.20
二、更改sysbench的oltp_common.lua脚本,脚本文件如下:
local DB_COL_NUM = 3000
function init()
assert(event ~= nil,
"this script is meant to be included by other OLTP scripts and " .. "should not be called directly.")
end
if sysbench.cmdline.command == nil then
error("Command is required. Supported commands: prepare, prewarm, run, " .. "cleanup, help")
end
sysbench.cmdline.options = {
table_size = {"Number of rows per table", 10000},
range_size = {"Range size for range SELECT queries", 100},
tables = {"Number of tables", 1},
point_selects = {"Number of point SELECT queries per transaction", 10},
simple_ranges = {"Number of simple range SELECT queries per transaction", 1},
sum_ranges = {"Number of SELECT SUM() queries per transaction", 1},
order_ranges = {"Number of SELECT ORDER BY queries per transaction", 1},
distinct_ranges = {"Number of SELECT DISTINCT queries per transaction", 1},
index_updates = {"Number of UPDATE index queries per transaction", 1},
non_index_updates = {"Number of UPDATE non-index queries per transaction", 1},
delete_inserts = {"Number of DELETE/INSERT combinations per transaction", 1},
range_selects = {"Enable/disable all range SELECT queries", true},
auto_inc = {"Use AUTO_INCREMENT column as Primary Key (for MySQL), " ..
"or its alternatives in other DBMS. When disabled, use " .. "client-generated IDs", true},
skip_trx = {"Don't start explicit transactions and execute all queries " .. "in the AUTOCOMMIT mode", false},
secondary = {"Use a secondary index in place of the PRIMARY KEY", false},
create_secondary = {"Create a secondary index in addition to the PRIMARY KEY", true},
mysql_storage_engine = {"Storage engine, if MySQL is used", "innodb"},
pgsql_variant = {"Use this PostgreSQL variant when running with the " ..
"PostgreSQL driver. The only currently supported " .. "variant is 'redshift'. When enabled, " ..
"create_secondary is automatically disabled, and " .. "delete_inserts is set to 0"}
}
function cmd_prepare()
local drv = sysbench.sql.driver()
local con = drv:connect()
for i = sysbench.tid % sysbench.opt.threads + 1, sysbench.opt.tables, sysbench.opt.threads do
create_table(drv, con, i)
end
end
function cmd_prewarm()
local drv = sysbench.sql.driver()
local con = drv:connect()
assert(drv:name() == "mysql", "prewarm is currently MySQL only")
con:query("SET tmp_table_size=20*1024*1024*1024")
con:query("SET max_heap_table_size=20*1024*1024*1024")
for i = sysbench.tid % sysbench.opt.threads + 1, sysbench.opt.tables, sysbench.opt.threads do
local t = "sbtest" .. i
print("Prewarming table " .. t)
con:query("ANALYZE TABLE sbtest" .. i)
con:query(string.format("SELECT AVG(id) FROM " .. "(SELECT * FROM %s FORCE KEY (PRIMARY) " .. "LIMIT %u) t", t,
sysbench.opt.table_size))
con:query(string.format("SELECT COUNT(*) FROM " .. "(SELECT * FROM %s WHERE k LIKE '%%0%%' LIMIT %u) t", t,
sysbench.opt.table_size))
end
end
sysbench.cmdline.commands = {
prepare = {cmd_prepare, sysbench.cmdline.PARALLEL_COMMAND},
prewarm = {cmd_prewarm, sysbench.cmdline.PARALLEL_COMMAND}
}
local c_value_template = "###########-###########-###########-" .. "###########-###########-###########-" ..
"###########-###########-###########-" .. "###########"
local pad_value_template = "###########-###########-###########-" .. "###########-###########"
local chad_value_template = "###########-" .. "###########"
function get_c_value()
return sysbench.rand.string(c_value_template)
end
function get_pad_value()
return sysbench.rand.string(pad_value_template)
end
function get_chad_value()
return sysbench.rand.string(chad_value_template)
end
function create_table(drv, con, table_num)
local id_index_def, id_def
local engine_def = ""
local extra_table_options = ""
local query
if sysbench.opt.secondary then
id_index_def = "KEY xid"
else
id_index_def = "PRIMARY KEY"
end
if drv:name() == "mysql" or drv:name() == "attachsql" or drv:name() == "drizzle" then
if sysbench.opt.auto_inc then
id_def = "INTEGER NOT NULL AUTO_INCREMENT"
else
id_def = "INTEGER NOT NULL"
end
engine_def = "/*! ENGINE = " .. sysbench.opt.mysql_storage_engine .. " */"
extra_table_options = mysql_table_options or ""
elseif drv:name() == "pgsql" then
if not sysbench.opt.auto_inc then
id_def = "INTEGER NOT NULL"
elseif pgsql_variant == 'redshift' then
id_def = "INTEGER IDENTITY(1,1)"
else
id_def = "SERIAL"
end
else
error("Unsupported database driver:" .. drv:name())
end
print(string.format("Creating table 'sbtest%d'...", table_num))
local tmp_query_str = [[
CREATE TABLE sbtest%d(
id %s,
k INTEGER DEFAULT '0' NOT NULL,
c CHAR(120) DEFAULT '' NOT NULL,
pad CHAR(60) DEFAULT '' NOT NULL,
]]
for i = 1, DB_COL_NUM do
local add_str = string.format([[
chad_value%d CHAR(60) DEFAULT '' NOT NULL,
]], i)
tmp_query_str = tmp_query_str .. add_str
end
tmp_query_str = tmp_query_str .. [[
%s (id)
) %s %s
]]
query = string.format(tmp_query_str, table_num, id_def, id_index_def, engine_def, extra_table_options)
-- print(query)
con:query(query)
if sysbench.opt.create_secondary then
print(string.format("Creating a secondary index on 'sbtest%d'...", table_num))
con:query(string.format("CREATE INDEX k_%d ON sbtest%d(k)", table_num, table_num))
end
if (sysbench.opt.table_size > 0) then
print(string.format("Inserting %d records into 'sbtest%d'", sysbench.opt.table_size, table_num))
end
if sysbench.opt.auto_inc then
tmp_query_str = "(k, c, pad"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. string.format(", chad_value%d", i)
end
tmp_query_str = tmp_query_str .. ") VALUES"
query = "INSERT INTO sbtest" .. table_num .. tmp_query_str
-- print(query)
else
tmp_query_str = "(id, k, c, pad"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. string.format(", chad_value%d", i)
end
tmp_query_str = tmp_query_str .. ") VALUES"
query = "INSERT INTO sbtest" .. table_num .. tmp_query_str
-- print(query)
end
con:bulk_insert_init(query)
local c_val
local pad_val
local chad_val
for i = 1, sysbench.opt.table_size do
c_val = get_c_value()
pad_val = get_pad_value()
chad_val = get_chad_value()
if (sysbench.opt.auto_inc) then
tmp_query_str = "(%d, '%s', '%s'"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. string.format(", '%s'", chad_val)
end
tmp_query_str = tmp_query_str .. ")"
query = string.format(tmp_query_str, sb_rand(1, sysbench.opt.table_size), c_val, pad_val)
-- print(query)
else
tmp_query_str = "(%d, %d, '%s', '%s'"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. string.format(", '%s'", chad_val)
end
tmp_query_str = tmp_query_str .. ")"
query = string.format(tmp_query_str, i, sb_rand(1, sysbench.opt.table_size), c_val, pad_val)
-- print(query)
end
con:bulk_insert_next(query)
end
con:bulk_insert_done()
end
local t = sysbench.sql.type
local stmt_defs = {
point_selects = {"SELECT c FROM sbtest%u WHERE id=?", t.INT},
simple_ranges = {"SELECT c FROM sbtest%u WHERE id BETWEEN ? AND ?", t.INT, t.INT},
sum_ranges = {"SELECT SUM(k) FROM sbtest%u WHERE id BETWEEN ? AND ?", t.INT, t.INT},
order_ranges = {"SELECT c FROM sbtest%u WHERE id BETWEEN ? AND ? ORDER BY c", t.INT, t.INT},
distinct_ranges = {"SELECT DISTINCT c FROM sbtest%u WHERE id BETWEEN ? AND ? ORDER BY c", t.INT, t.INT},
index_updates = {"UPDATE sbtest%u SET k=k+1 WHERE id=?", t.INT},
non_index_updates = {"UPDATE sbtest%u SET c=? WHERE id=?", {t.CHAR, 120}, t.INT},
deletes = {"DELETE FROM sbtest%u WHERE id=?", t.INT},
inserts = {"INSERT INTO sbtest%u (id, k, c, pad) VALUES (?, ?, ?, ?)", t.INT, t.INT, {t.CHAR, 120}, {t.CHAR, 60}}
}
local tmp_query_str = "INSERT INTO sbtest%u (id, k, c, pad"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. string.format(", chad_value%d", i)
end
tmp_query_str = tmp_query_str .. ") VALUES (?, ?, ?, ?"
for i = 1, DB_COL_NUM do
tmp_query_str = tmp_query_str .. ", ?"
end
tmp_query_str = tmp_query_str .. ")"
stmt_defs.inserts = {tmp_query_str, t.INT, t.INT, {t.CHAR, 120}, {t.CHAR, 60}}
for i = 1, DB_COL_NUM do
table.insert(stmt_defs.inserts, {t.CHAR, 60})
end
-- print(stmt_defs.inserts)
function prepare_begin()
stmt.begin = con:prepare("BEGIN")
end
function prepare_commit()
stmt.commit = con:prepare("COMMIT")
end
function prepare_for_each_table(key)
for t = 1, sysbench.opt.tables do
stmt[t][key] = con:prepare(string.format(stmt_defs[key][1], t))
local nparam = #stmt_defs[key] - 1
if nparam > 0 then
param[t][key] = {}
end
for p = 1, nparam do
local btype = stmt_defs[key][p + 1]
local len
if type(btype) == "table" then
len = btype[2]
btype = btype[1]
end
if btype == sysbench.sql.type.VARCHAR or btype == sysbench.sql.type.CHAR then
param[t][key][p] = stmt[t][key]:bind_create(btype, len)
else
param[t][key][p] = stmt[t][key]:bind_create(btype)
end
end
if nparam > 0 then
stmt[t][key]:bind_param(unpack(param[t][key]))
end
end
end
function prepare_point_selects()
prepare_for_each_table("point_selects")
end
function prepare_simple_ranges()
prepare_for_each_table("simple_ranges")
end
function prepare_sum_ranges()
prepare_for_each_table("sum_ranges")
end
function prepare_order_ranges()
prepare_for_each_table("order_ranges")
end
function prepare_distinct_ranges()
prepare_for_each_table("distinct_ranges")
end
function prepare_index_updates()
prepare_for_each_table("index_updates")
end
function prepare_non_index_updates()
prepare_for_each_table("non_index_updates")
end
function prepare_delete_inserts()
prepare_for_each_table("deletes")
prepare_for_each_table("inserts")
end
function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()
stmt = {}
param = {}
for t = 1, sysbench.opt.tables do
stmt[t] = {}
param[t] = {}
end
prepare_statements()
end
function close_statements()
for t = 1, sysbench.opt.tables do
for k, s in pairs(stmt[t]) do
stmt[t][k]:close()
end
end
if (stmt.begin ~= nil) then
stmt.begin:close()
end
if (stmt.commit ~= nil) then
stmt.commit:close()
end
end
function thread_done()
close_statements()
con:disconnect()
end
function cleanup()
local drv = sysbench.sql.driver()
local con = drv:connect()
for i = 1, sysbench.opt.tables do
print(string.format("Dropping table 'sbtest%d'...", i))
con:query("DROP TABLE IF EXISTS sbtest" .. i)
end
end
local function get_table_num()
return sysbench.rand.uniform(1, sysbench.opt.tables)
end
local function get_id()
return sysbench.rand.default(1, sysbench.opt.table_size)
end
function begin()
stmt.begin:execute()
end
function commit()
stmt.commit:execute()
end
function execute_point_selects()
local tnum = get_table_num()
local i
for i = 1, sysbench.opt.point_selects do
param[tnum].point_selects[1]:set(get_id())
stmt[tnum].point_selects:execute()
end
end
local function execute_range(key)
local tnum = get_table_num()
for i = 1, sysbench.opt[key] do
local id = get_id()
param[tnum][key][1]:set(id)
param[tnum][key][2]:set(id + sysbench.opt.range_size - 1)
stmt[tnum][key]:execute()
end
end
function execute_simple_ranges()
execute_range("simple_ranges")
end
function execute_sum_ranges()
execute_range("sum_ranges")
end
function execute_order_ranges()
execute_range("order_ranges")
end
function execute_distinct_ranges()
execute_range("distinct_ranges")
end
function execute_index_updates()
local tnum = get_table_num()
for i = 1, sysbench.opt.index_updates do
param[tnum].index_updates[1]:set(get_id())
stmt[tnum].index_updates:execute()
end
end
function execute_non_index_updates()
local tnum = get_table_num()
for i = 1, sysbench.opt.non_index_updates do
param[tnum].non_index_updates[1]:set_rand_str(c_value_template)
param[tnum].non_index_updates[2]:set(get_id())
stmt[tnum].non_index_updates:execute()
end
end
function execute_delete_inserts()
local tnum = get_table_num()
for i = 1, sysbench.opt.delete_inserts do
local id = get_id()
local k = get_id()
param[tnum].deletes[1]:set(id)
param[tnum].inserts[1]:set(id)
param[tnum].inserts[2]:set(k)
param[tnum].inserts[3]:set_rand_str(c_value_template)
param[tnum].inserts[4]:set_rand_str(pad_value_template)
for i = 5, 5 + DB_COL_NUM do
param[tnum].inserts[i]:set_rand_str(chad_value_template)
end
stmt[tnum].deletes:execute()
stmt[tnum].inserts:execute()
end
end
function sysbench.hooks.before_restart_event(errdesc)
if errdesc.sql_errno == 2013 or -- CR_SERVER_LOST
errdesc.sql_errno == 2055 or -- CR_SERVER_LOST_EXTENDED
errdesc.sql_errno == 2006 or -- CR_SERVER_GONE_ERROR
errdesc.sql_errno == 2011 -- CR_TCP_CONNECTION
then
close_statements()
prepare_statements()
end
end
该脚本为修改后的脚本,可以通过更改头部内容的DB_COL_NUM来指定压测表的列数量,默认3000列
三、构建数据命令
3.1、创建sysbench连接信息文件,格式如下:
vim config
###########################
mysql-host=${ip}
mysql-port=4000
mysql-password=password
mysql-user=root
mysql-db=sbtest
time=300
threads=16
report-interval=10
db-driver=mysql
3.2、执行构建数据命令
sysbench --config-file=config oltp_point_select --tables=1 --table-size=100000 prepare
四、压测命令
4.1、Point select
sysbench --config-file=config oltp_point_select --tables=1 --table-size=100000 --db-ps-mode=auto --rand-type=uniform run
4.2、Update index
sysbench --config-file=config oltp_update_index --tables=1 --table-size=100000 --db-ps-mode=auto --rand-type=uniform run
4.3、Read-only
sysbench --config-file=config oltp_read_only --tables=1 --table-size=100000 --db-ps-mode=auto --rand-type=uniform run