DSORT
Start, Stop, and monitor distributed parallel sorting (dSort)
For background and in-depth presentation, please see this document.
- Usage
- Example
- Generate Shards
- Start dSort job
- Show dSort jobs and job status
- Stop dSort job
- Remove dSort job
- Wait for dSort job
Usage
ais dsort [command options] [JSON_SPECIFICATION|YAML_SPECIFICATION|-] [SRC_BUCKET] [DST_BUCKET]
$ ais dsort --help
NAME:
ais dsort - (alias for "job start dsort") start dsort job
Required parameters:
- input_bck: source bucket (used as both source and destination if the latter not specified)
- input_format: (see docs and examples below)
- output_format: (ditto)
- output_shard_size: (as the name implies)
E.g. inline JSON spec:
$ ais start dsort '{
"extension": ".tar",
"input_bck": {"name": "dsort-testing"},
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
"description": "pack records into categorized shards",
"ekm_file": "http://website.web/static/ekm_file.txt",
"ekm_file_sep": " "
}'
E.g. inline YAML spec:
$ ais start dsort -f - <<EOM
extension: .tar
input_bck:
name: dsort-testing
input_format:
template: shard-{0..9}
output_format: new-shard-{0000..1000}
output_shard_size: 10KB
description: shuffle shards from 0 to 9
algorithm:
kind: shuffle
EOM
Tip: use '--dry-run' to see the results without making any changes
Tip: use '--verbose' to print the spec (with all its parameters including applied defaults)
See also: docs/dsort.md, docs/cli/dsort.md, and ais/test/scripts/dsort*
USAGE:
ais dsort [command options] [JSON_SPECIFICATION|YAML_SPECIFICATION|-] [SRC_BUCKET] [DST_BUCKET]
OPTIONS:
--file value, -f value path to JSON or YAML job specification
--verbose, -v verbose
--help, -h show help
Example
This example simply runs ais/test/scripts/dsort-ex1-spec.json specification. The source and destination buckets - ais://src and ais://dst, respectively - must exist.
Further, the source buckets must have at least 10 shards with names that match input_format
(see below).
Notice the -v
(--verbose
) switch as well.
$ ais start dsort ais://src ais://dst -f ais/test/scripts/dsort-ex1-spec.json --verbose
PROPERTY VALUE
algorithm.content_key_type -
algorithm.decreasing false
algorithm.extension -
algorithm.kind alphanumeric
algorithm.seed -
create_concurrency_max_limit 0
description sort shards alphanumerically
dry_run false
dsorter_type -
extension .tar
extract_concurrency_max_limit 0
input_bck ais://src
input_format.objnames -
input_format.template shard-{0..9}
max_mem_usage -
ekm_file -
ekm_file_sep \t
output_bck ais://dst
output_format new-shard-{0000..1000}
output_shard_size 10KB
Config override: none
srt-M8ld-VU_i
Generate Shards
ais archive gen-shards "BUCKET/TEMPLATE.EXT"
Put randomly generated shards into a bucket. The main use case for this command is dSort testing. Further reference for this command can be found here.
Start dSort job
ais start dsort JOB_SPEC
or ais start dsort -f <PATH_TO_JOB_SPEC>
Start new dSort job with the provided specification.
Specification should be provided by either argument or -f
flag - providing both argument and flag will result in error.
Upon creation, JOB_ID
of the job is returned - it can then be used to abort it or retrieve metrics.
Flag | Type | Description | Default |
---|---|---|---|
--file, -f |
string |
Path to file containing JSON or YAML job specification. Providing - will result in reading from STDIN |
"" |
The following table describes JSON/YAML keys which can be used in the specification.
Key | Type | Description | Required | Default |
---|---|---|---|---|
extension |
string |
extension of input and output shards (either .tar , .tgz or .zip ) |
yes | |
input_format.template |
string |
name template for input shard | yes | |
output_format |
string |
name template for output shard | yes | |
input_bck.name |
string |
bucket name where shards objects are stored | yes | |
input_bck.provider |
string |
bucket backend provider, see docs | no | "ais" |
output_bck.name |
string |
bucket name where new output shards will be saved | no | same as input_bck.name |
output_bck.provider |
string |
bucket backend provider, see docs | no | same as input_bck.provider |
description |
string |
description of dSort job | no | "" |
output_shard_size |
string |
size (in bytes) of the output shard, can be in form of raw numbers 10240 or suffixed 10KB |
yes | |
algorithm.kind |
string |
determines which sorting algorithm dSort job uses, available are: "alphanumeric" , "shuffle" , "content" |
no | "alphanumeric" |
algorithm.decreasing |
bool |
determines if the algorithm should sort the records in decreasing or increasing order, used for kind=alphanumeric or kind=content |
no | false |
algorithm.seed |
string |
seed provided to random generator, used when kind=shuffle |
no | "" - time.Now() is used |
algorithm.extension |
string |
content of the file with provided extension will be used as sorting key, used when kind=content |
yes (only when kind=content ) |
|
algorithm.content_key_type |
string |
content key type; may have one of the following values: “int”, “float”, or “string”; used exclusively with kind=content sorting |
yes (only when kind=content ) |
|
ekm_file |
string |
URL to the file containing external key map (it should contain lines in format: record_key[sep]shard-%d-fmt ) |
yes (only when output_format not provided) |
"" |
ekm_file_sep |
string |
separator used for splitting record_key and shard-%d-fmt in the lines in external key map |
no | \t (TAB) |
max_mem_usage |
string |
limits the amount of total system memory allocated by both dSort and other running processes. Once and if this threshold is crossed, dSort will continue extracting onto local drives. Can be in format 60% or 10GB | no | same as in /deploy/dev/local/aisnode_config.sh |
extract_concurrency_max_limit |
int |
limits maximum number of concurrent shards extracted per disk | no | (calculated based on different factors) ~50 |
create_concurrency_max_limit |
int |
limits maximum number of concurrent shards created per disk | no | (calculated based on different factors) ~50 |
There’s also the possibility to override some of the values from global distributed_sort
config via job specification.
All values are optional - if empty, the value from global distributed_sort
config will be used.
For more information refer to configuration.
Key | Type | Description |
---|---|---|
duplicated_records |
string |
what to do when duplicated records are found: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation |
missing_shards |
string |
what to do when missing shards are detected: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation |
ekm_malformed_line |
string |
what to do when extraction key map notices a malformed line: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation |
ekm_missing_key |
string |
what to do when extraction key map have a missing key: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation |
dsorter_mem_threshold |
string |
minimum free memory threshold which will activate specialized dsorter type which uses memory in creation phase - benchmarks shows that this type of dsorter behaves better than general type |
Examples
Sort records inside the shards
Command defined below starts (alphanumeric) sorting job with extended metrics for input shards with names shard-0.tar
, shard-1.tar
, ..., shard-9.tar
.
Each of the output shards will have at least 10240
bytes (10KB
) and will be named new-shard-0000.tar
, new-shard-0001.tar
, ...
Assuming that dsort_spec.json
contains:
{
"extension": ".tar",
"input_bck": {"name": "dsort-testing"},
"input_format": {
"template": "shard-{0..9}"
},
"output_format": "new-shard-{0000..1000}",
"output_shard_size": "10KB",
"description": "sort shards from 0 to 9",
"algorithm": {
"kind": "alphanumeric"
},
}
You can start dSort job with:
$ ais start dsort -f dsort_spec.json
JGHEoo89gg
Shuffle records
Command defined below starts basic shuffle job for input shards with names shard-0.tar
, shard-1.tar
, ..., shard-9.tar
.
Each of the output shards will have at least 10240
bytes (10KB
) and will be named new-shard-0000.tar
, new-shard-0001.tar
, ...
$ ais start dsort -f - <<EOM
extension: .tar
input_bck:
name: dsort-testing
input_format:
template: shard-{0..9}
output_format: new-shard-{0000..1000}
output_shard_size: 10KB
description: shuffle shards from 0 to 9
algorithm:
kind: shuffle
EOM
JGHEoo89gg
Pack records into shards with different categories - EKM (External Key Map)
One of the key features of the dSort is that user can specify the exact mapping from the record key to the output shard.
To use this feature output_format
should be empty and ekm_file
, as well as ekm_file_sep
, must be set.
The output shards will be created with provided template format.
Assuming that ekm_file
(URL: http://website.web/static/ekm_file.txt
) has content:
cat_0.txt shard-cats-%d
cat_1.txt shard-cats-%d
...
dog_0.txt shard-dogs-%d
dog_1.txt shard-dogs-%d
...
car_0.txt shard-car-%d
car_1.txt shard-car-%d
...
or if ekm_file
(URL: http://website.web/static/ekm_file.json
, notice .json
extension) and has content:
{
"shard-cats-%d": [
"cat_0.txt",
"cat_1.txt",
...
],
"shard-dogs-%d": [
"dog_0.txt",
"dog_1.txt",
...
],
"shard-car-%d": [
"car_0.txt",
"car_1.txt",
...
],
...
}
or, you can also use regex as the record identifier. The ekm_file
can contain regex patterns as keys to match multiple records that fit the regex pattern to provided format.
{
"shard-cats-%d": [
"cat_[0-9]+\\.txt"
],
"shard-dogs-%d": [
"dog_[0-9]+\\.txt"
],
"shard-car-%d": [
"car_[0-9]+\\.txt"
],
...
}
and content of the input shards looks more or less like this:
shard-0.tar:
- cat_0.txt
- dog_0.txt
- car_0.txt
...
shard-1.tar:
- cat_1.txt
- dog_1.txt
- car_1.txt
...
You can run:
$ ais start dsort '{
"extension": ".tar",
"input_bck": {"name": "dsort-testing"},
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
"description": "pack records into categorized shards",
"ekm_file": "http://website.web/static/ekm_file.txt",
"ekm_file_sep": " "
}'
JGHEoo89gg
After the run, the output shards will look more or less like this (the number of records in given shard depends on provided output_shard_size
):
shard-cats-0.tar:
- cat_1.txt
- cat_2.txt
shard-cats-1.tar:
- cat_3.txt
- cat_4.txt
...
shard-dogs-0.tar:
- dog_1.txt
- dog_2.txt
...
EKM also supports template syntax to express output shard names.
For example, if ekm_file
has content:
{
"shard-{0..100..3}-cats": [
"cat_0.txt",
"cat_1.txt",
"cat_3.txt",
"cat_4.txt",
"cat_5.txt",
"cat_6.txt",
...
],
"shard-@00001-gap-@100-dogs": [
"dog_0.txt",
"dog_1.txt",
...
],
"shard-%06d-cars": [
"car_0.txt",
"car_1.txt",
...
],
...
}
After running dsort
, the output would be look like this:
shard-0-cats.tar:
- cat_0.txt
- cat_1.txt
shard-3-cats.tar:
- cat_2.txt
- cat_3.txt
shard-6-cats.tar:
- cat_4.txt
- cat_5.txt
...
shard-00001-gap-001-dogs.tar:
- dog_0.txt
- dog_1.txt
shard-00001-gap-002-dogs.tar:
- dog_2.txt
- dog_3.txt
...
shard-1-cars.tar:
- car_0.txt
- car_1.txt
shard-2-cars.tar:
- car_2.txt
- car_3.txt
...
Show dSort jobs and job status
ais show job dsort [JOB_ID]
Retrieve the status of the dSort with provided JOB_ID
which is returned upon creation.
Lists all dSort jobs if the JOB_ID
argument is omitted.
Options
Flag | Type | Description | Default |
---|---|---|---|
--regex |
string |
Regex for the description of dSort jobs | "" |
--refresh |
duration |
Refresh interval - time duration between reports. The usual unit suffixes are supported and include m (for minutes), s (seconds), ms (milliseconds). E.g.: --refresh 2s |
` ` |
--verbose, -v |
bool |
Show detailed metrics | false |
--log |
string |
Path to file where the metrics will be saved (does not work with progress bar) | /tmp/dsort_run.txt |
--json, -j |
bool |
Show only json metrics | false |
Examples
Show dSort jobs with description matching provided regex
Shows all dSort jobs with descriptions starting with sort
prefix.
$ ais show job dsort --regex "^sort (.*)"
JOB ID STATUS START FINISH DESCRIPTION
nro_Y5h9n Finished 03-16 11:39:07 03-16 11:39:07 sort shards from 0 to 9
Key_Y5h9n Finished 03-16 11:39:23 03-16 11:39:23 sort shards from 10 to 19
enq9Y5Aqn Finished 03-16 11:39:34 03-16 11:39:34 sort shards from 20 to 29
Save metrics to log file
Save newly fetched metrics of the dSort job with ID 5JjIuGemR
to /tmp/dsort_run.txt
file every 500
milliseconds
$ ais show job dsort 5JjIuGemR --refresh 500ms --log "/tmp/dsort_run.txt"
Dsort job has finished successfully in 21.948806ms:
Longest extraction: 1.49907ms
Longest sorting: 8.288299ms
Longest creation: 4.553µs
Show only json metrics
$ ais show job dsort 5JjIuGemR --json
{
"825090t8089": {
"local_extraction": {
"started_time": "2020-05-28T09:53:42.466267891-04:00",
"end_time": "2020-05-28T09:53:42.50773835-04:00",
....
},
....
},
....
}
Show only json metrics filtered by daemon id
$ ais show job dsort 5JjIuGemR 766516t8087 --json
{
"766516t8087": {
"local_extraction": {
"started_time": "2020-05-28T09:53:42.466267891-04:00",
"end_time": "2020-05-28T09:53:42.50773835-04:00",
....
},
....
}
}
Using jq to filter out the json formatted metric output
Show running status of meta sorting phase for all targets.
$ ais show job dsort 5JjIuGemR --json | jq .[].meta_sorting.running
false
false
true
false
Show created shards in each target along with the target ids.
$ ais show job dsort 5JjIuGemR --json | jq 'to_entries[] | [.key, .value.shard_creation.created_count]'
[
"766516t8087",
"189"
]
[
"710650t8086",
"207"
]
[
"825090t8089",
"211"
]
[
"743838t8088",
"186"
]
[
"354275t8085",
"207"
]
Stop dSort job
ais stop dsort JOB_ID
Stop the dSort job with given JOB_ID
.
Remove dSort job
ais job rm dsort JOB_ID
Remove the finished dSort job with given JOB_ID
from the job list.
Wait for dSort job
ais wait dsort JOB_ID
or, same:
ais wait JOB_ID
Wait for the dSort job with given JOB_ID
to finish.
Options
Flag | Type | Description | Default |
---|---|---|---|
--refresh |
duration |
Refresh interval - time duration between reports. The usual unit suffixes are supported and include m (for minutes), s (seconds), ms (milliseconds) |
1s |
--progress |
bool |
Displays progress bar | false |