424 lines
43 KiB
HTML
424 lines
43 KiB
HTML
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta name="generator" content="rustdoc"><meta name="description" content="Synchronization primitives for use in asynchronous contexts."><meta name="keywords" content="rust, rustlang, rust-lang, sync"><title>tokio::sync - Rust</title><link rel="preload" as="font" type="font/woff2" crossorigin href="../../SourceSerif4-Regular.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../../FiraSans-Regular.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../../FiraSans-Medium.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../../SourceCodePro-Regular.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../../SourceSerif4-Bold.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../../SourceCodePro-Semibold.ttf.woff2"><link rel="stylesheet" type="text/css" href="../../normalize.css"><link rel="stylesheet" type="text/css" href="../../rustdoc.css" id="mainThemeStyle"><link rel="stylesheet" type="text/css" href="../../ayu.css" disabled><link rel="stylesheet" type="text/css" href="../../dark.css" disabled><link rel="stylesheet" type="text/css" href="../../light.css" id="themeStyle"><script id="default-settings" ></script><script src="../../storage.js"></script><script src="../../crates.js"></script><script defer src="../../main.js"></script>
|
||
<noscript><link rel="stylesheet" href="../../noscript.css"></noscript><link rel="alternate icon" type="image/png" href="../../favicon-16x16.png"><link rel="alternate icon" type="image/png" href="../../favicon-32x32.png"><link rel="icon" type="image/svg+xml" href="../../favicon.svg"></head><body class="rustdoc mod"><!--[if lte IE 11]><div class="warning">This old browser is unsupported and will most likely display funky things.</div><![endif]--><nav class="sidebar"><div class="sidebar-menu" role="button">☰</div><a class="sidebar-logo" href="../../tokio/index.html"><div class="logo-container"><img class="rust-logo" src="../../rust-logo.png" alt="logo"></div>
|
||
</a><h2 class="location">Module sync</h2><div class="sidebar-elems"><div class="block items"><ul><li><a href="#modules">Modules</a></li><li><a href="#structs">Structs</a></li><li><a href="#enums">Enums</a></li></ul></div><div id="sidebar-vars" data-name="sync" data-ty="mod" data-relpath="./"></div><script defer src="./sidebar-items.js"></script></div></nav><main><div class="width-limiter"><div class="sub-container"><a class="sub-logo-container" href="../../tokio/index.html"><img class="rust-logo" src="../../rust-logo.png" alt="logo"></a><nav class="sub"><div class="theme-picker"><button id="theme-picker" aria-label="Pick another theme!" aria-haspopup="menu" title="themes"><img width="18" height="18" alt="Pick another theme!" src="../../brush.svg"></button><div id="theme-choices" role="menu"></div></div><form class="search-form"><div class="search-container"><div><select id="crate-search"><option value="All crates">All crates</option></select><input class="search-input" name="search" autocomplete="off" spellcheck="false" placeholder="Click or press ‘S’ to search, ‘?’ for more options…" type="search"></div><button type="button" id="help-button" title="help">?</button><a id="settings-menu" href="../../settings.html" title="settings"><img width="18" height="18" alt="Change settings" src="../../wheel.svg"></a></div></form></nav></div><section id="main-content" class="content"><h1 class="fqn"><span class="in-band">Module <a href="../index.html">tokio</a>::<wbr><a class="mod" href="#">sync</a><button id="copy-path" onclick="copy_path(this)" title="Copy item path to clipboard"><img src="../../clipboard.svg" width="19" height="18" alt="Copy item path"></button></span><span class="out-of-band"><span id="render-detail"><a id="toggle-all-docs" href="javascript:void(0)" title="collapse all docs">[<span class="inner">−</span>]</a></span><a class="srclink" href="../../src/tokio/sync/mod.rs.html#1-499" title="goto source code">[src]</a></span></h1><details class="rustdoc-toggle top-doc" open><summary class="hideme"><span>Expand description</span></summary><div class="docblock"><p>Synchronization primitives for use in asynchronous contexts.</p>
|
||
<p>Tokio programs tend to be organized as a set of <a href="../task/index.html">tasks</a> where each task
|
||
operates independently and may be executed on separate physical threads. The
|
||
synchronization primitives provided in this module permit these independent
|
||
tasks to communicate together.</p>
|
||
<h2 id="message-passing" class="section-header"><a href="#message-passing">Message passing</a></h2>
|
||
<p>The most common form of synchronization in a Tokio program is message
|
||
passing. Two tasks operate independently and send messages to each other to
|
||
synchronize. Doing so has the advantage of avoiding shared state.</p>
|
||
<p>Message passing is implemented using channels. A channel supports sending a
|
||
message from one producer task to one or more consumer tasks. There are a
|
||
few flavors of channels provided by Tokio. Each channel flavor supports
|
||
different message passing patterns. When a channel supports multiple
|
||
producers, many separate tasks may <strong>send</strong> messages. When a channel
|
||
supports multiple consumers, many different separate tasks may <strong>receive</strong>
|
||
messages.</p>
|
||
<p>Tokio provides many different channel flavors as different message passing
|
||
patterns are best handled with different implementations.</p>
|
||
<h3 id="oneshot-channel" class="section-header"><a href="#oneshot-channel"><code>oneshot</code> channel</a></h3>
|
||
<p>The <a href="oneshot/index.html"><code>oneshot</code> channel</a> supports sending a <strong>single</strong> value from a
|
||
single producer to a single consumer. This channel is usually used to send
|
||
the result of a computation to a waiter.</p>
|
||
<p><strong>Example:</strong> using a <a href="oneshot/index.html"><code>oneshot</code> channel</a> to receive the result of a
|
||
computation.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::sync::oneshot</span>;
|
||
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>() -> <span class="ident">String</span> {
|
||
<span class="string">"represents the result of the computation"</span>.<span class="ident">to_string</span>()
|
||
}
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">oneshot::channel</span>();
|
||
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">some_computation</span>().<span class="kw">await</span>;
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">res</span>).<span class="ident">unwrap</span>();
|
||
});
|
||
|
||
<span class="comment">// Do other work while the computation is happening in the background</span>
|
||
|
||
<span class="comment">// Wait for the computation result</span>
|
||
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
}</code></pre></div>
|
||
<p>Note, if the task produces a computation result as its final
|
||
action before terminating, the <a href="../task/struct.JoinHandle.html"><code>JoinHandle</code></a> can be used to
|
||
receive that value instead of allocating resources for the
|
||
<code>oneshot</code> channel. Awaiting on <a href="../task/struct.JoinHandle.html"><code>JoinHandle</code></a> returns <code>Result</code>. If
|
||
the task panics, the <code>Joinhandle</code> yields <code>Err</code> with the panic
|
||
cause.</p>
|
||
<p><strong>Example:</strong></p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>() -> <span class="ident">String</span> {
|
||
<span class="string">"the result of the computation"</span>.<span class="ident">to_string</span>()
|
||
}
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="kw">let</span> <span class="ident">join_handle</span> <span class="op">=</span> <span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="ident">some_computation</span>().<span class="kw">await</span>
|
||
});
|
||
|
||
<span class="comment">// Do other work while the computation is happening in the background</span>
|
||
|
||
<span class="comment">// Wait for the computation result</span>
|
||
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">join_handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
}</code></pre></div>
|
||
<h3 id="mpsc-channel" class="section-header"><a href="#mpsc-channel"><code>mpsc</code> channel</a></h3>
|
||
<p>The <a href="mpsc/index.html"><code>mpsc</code> channel</a> supports sending <strong>many</strong> values from <strong>many</strong>
|
||
producers to a single consumer. This channel is often used to send work to a
|
||
task or to receive the result of many computations.</p>
|
||
<p><strong>Example:</strong> using an mpsc to incrementally stream the results of a series
|
||
of computations.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::sync::mpsc</span>;
|
||
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>(<span class="ident">input</span>: <span class="ident">u32</span>) -> <span class="ident">String</span> {
|
||
<span class="macro">format!</span>(<span class="string">"the result of computation {}"</span>, <span class="ident">input</span>)
|
||
}
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">mpsc::channel</span>(<span class="number">100</span>);
|
||
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="kw">for</span> <span class="ident">i</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
|
||
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">some_computation</span>(<span class="ident">i</span>).<span class="kw">await</span>;
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">res</span>).<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
}
|
||
});
|
||
|
||
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">res</span>) <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
|
||
<span class="macro">println!</span>(<span class="string">"got = {}"</span>, <span class="ident">res</span>);
|
||
}
|
||
}</code></pre></div>
|
||
<p>The argument to <code>mpsc::channel</code> is the channel capacity. This is the maximum
|
||
number of values that can be stored in the channel pending receipt at any
|
||
given time. Properly setting this value is key in implementing robust
|
||
programs as the channel capacity plays a critical part in handling back
|
||
pressure.</p>
|
||
<p>A common concurrency pattern for resource management is to spawn a task
|
||
dedicated to managing that resource and using message passing between other
|
||
tasks to interact with the resource. The resource may be anything that may
|
||
not be concurrently used. Some examples include a socket and program state.
|
||
For example, if multiple tasks need to send data over a single socket, spawn
|
||
a task to manage the socket and use a channel to synchronize.</p>
|
||
<p><strong>Example:</strong> sending data from many tasks over a single socket using message
|
||
passing.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::io</span>::{<span class="self">self</span>, <span class="ident">AsyncWriteExt</span>};
|
||
<span class="kw">use</span> <span class="ident">tokio::net::TcpStream</span>;
|
||
<span class="kw">use</span> <span class="ident">tokio::sync::mpsc</span>;
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() -> <span class="ident">io::Result</span><span class="op"><</span>()<span class="op">></span> {
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">socket</span> <span class="op">=</span> <span class="ident">TcpStream::connect</span>(<span class="string">"www.example.com:1234"</span>).<span class="kw">await</span><span class="question-mark">?</span>;
|
||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">mpsc::channel</span>(<span class="number">100</span>);
|
||
|
||
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
|
||
<span class="comment">// Each task needs its own `tx` handle. This is done by cloning the</span>
|
||
<span class="comment">// original handle.</span>
|
||
<span class="kw">let</span> <span class="ident">tx</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">clone</span>();
|
||
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="kw-2">&</span><span class="string">b"data to write"</span>[..]).<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
});
|
||
}
|
||
|
||
<span class="comment">// The `rx` half of the channel returns `None` once **all** `tx` clones</span>
|
||
<span class="comment">// drop. To ensure `None` is returned, drop the handle owned by the</span>
|
||
<span class="comment">// current task. If this `tx` handle is not dropped, there will always</span>
|
||
<span class="comment">// be a single outstanding `tx` handle.</span>
|
||
<span class="ident">drop</span>(<span class="ident">tx</span>);
|
||
|
||
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">res</span>) <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
|
||
<span class="ident">socket</span>.<span class="ident">write_all</span>(<span class="ident">res</span>).<span class="kw">await</span><span class="question-mark">?</span>;
|
||
}
|
||
|
||
<span class="prelude-val">Ok</span>(())
|
||
}</code></pre></div>
|
||
<p>The <a href="mpsc/index.html"><code>mpsc</code></a> and <a href="oneshot/index.html"><code>oneshot</code></a> channels can be combined to
|
||
provide a request / response type synchronization pattern with a shared
|
||
resource. A task is spawned to synchronize a resource and waits on commands
|
||
received on a <a href="mpsc/index.html"><code>mpsc</code></a> channel. Each command includes a
|
||
<a href="oneshot/index.html"><code>oneshot</code></a> <code>Sender</code> on which the result of the command is sent.</p>
|
||
<p><strong>Example:</strong> use a task to synchronize a <code>u64</code> counter. Each task sends an
|
||
“fetch and increment” command. The counter value <strong>before</strong> the increment is
|
||
sent over the provided <code>oneshot</code> channel.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::sync</span>::{<span class="ident">oneshot</span>, <span class="ident">mpsc</span>};
|
||
<span class="kw">use</span> <span class="ident">Command::Increment</span>;
|
||
|
||
<span class="kw">enum</span> <span class="ident">Command</span> {
|
||
<span class="ident">Increment</span>,
|
||
<span class="comment">// Other commands can be added here</span>
|
||
}
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="kw">let</span> (<span class="ident">cmd_tx</span>, <span class="kw-2">mut</span> <span class="ident">cmd_rx</span>) <span class="op">=</span> <span class="ident">mpsc::channel</span>::<span class="op"><</span>(<span class="ident">Command</span>, <span class="ident">oneshot::Sender</span><span class="op"><</span><span class="ident">u64</span><span class="op">></span>)<span class="op">></span>(<span class="number">100</span>);
|
||
|
||
<span class="comment">// Spawn a task to manage the counter</span>
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">counter</span>: <span class="ident">u64</span> <span class="op">=</span> <span class="number">0</span>;
|
||
|
||
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>((<span class="ident">cmd</span>, <span class="ident">response</span>)) <span class="op">=</span> <span class="ident">cmd_rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
|
||
<span class="kw">match</span> <span class="ident">cmd</span> {
|
||
<span class="ident">Increment</span> => {
|
||
<span class="kw">let</span> <span class="ident">prev</span> <span class="op">=</span> <span class="ident">counter</span>;
|
||
<span class="ident">counter</span> <span class="op">+</span><span class="op">=</span> <span class="number">1</span>;
|
||
<span class="ident">response</span>.<span class="ident">send</span>(<span class="ident">prev</span>).<span class="ident">unwrap</span>();
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">join_handles</span> <span class="op">=</span> <span class="macro">vec!</span>[];
|
||
|
||
<span class="comment">// Spawn tasks that will send the increment command.</span>
|
||
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
|
||
<span class="kw">let</span> <span class="ident">cmd_tx</span> <span class="op">=</span> <span class="ident">cmd_tx</span>.<span class="ident">clone</span>();
|
||
|
||
<span class="ident">join_handles</span>.<span class="ident">push</span>(<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="kw">let</span> (<span class="ident">resp_tx</span>, <span class="ident">resp_rx</span>) <span class="op">=</span> <span class="ident">oneshot::channel</span>();
|
||
|
||
<span class="ident">cmd_tx</span>.<span class="ident">send</span>((<span class="ident">Increment</span>, <span class="ident">resp_tx</span>)).<span class="kw">await</span>.<span class="ident">ok</span>().<span class="ident">unwrap</span>();
|
||
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">resp_rx</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
|
||
<span class="macro">println!</span>(<span class="string">"previous value = {}"</span>, <span class="ident">res</span>);
|
||
}));
|
||
}
|
||
|
||
<span class="comment">// Wait for all tasks to complete</span>
|
||
<span class="kw">for</span> <span class="ident">join_handle</span> <span class="kw">in</span> <span class="ident">join_handles</span>.<span class="ident">drain</span>(..) {
|
||
<span class="ident">join_handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
}
|
||
}</code></pre></div>
|
||
<h3 id="broadcast-channel" class="section-header"><a href="#broadcast-channel"><code>broadcast</code> channel</a></h3>
|
||
<p>The <a href="broadcast/index.html"><code>broadcast</code> channel</a> supports sending <strong>many</strong> values from
|
||
<strong>many</strong> producers to <strong>many</strong> consumers. Each consumer will receive
|
||
<strong>each</strong> value. This channel can be used to implement “fan out” style
|
||
patterns common with pub / sub or “chat” systems.</p>
|
||
<p>This channel tends to be used less often than <code>oneshot</code> and <code>mpsc</code> but still
|
||
has its use cases.</p>
|
||
<p>Basic usage</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::sync::broadcast</span>;
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx1</span>) <span class="op">=</span> <span class="ident">broadcast::channel</span>(<span class="number">16</span>);
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">rx2</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">subscribe</span>();
|
||
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="macro">assert_eq!</span>(<span class="ident">rx1</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">10</span>);
|
||
<span class="macro">assert_eq!</span>(<span class="ident">rx1</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">20</span>);
|
||
});
|
||
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="macro">assert_eq!</span>(<span class="ident">rx2</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">10</span>);
|
||
<span class="macro">assert_eq!</span>(<span class="ident">rx2</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">20</span>);
|
||
});
|
||
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="number">10</span>).<span class="ident">unwrap</span>();
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="number">20</span>).<span class="ident">unwrap</span>();
|
||
}</code></pre></div>
|
||
<h3 id="watch-channel" class="section-header"><a href="#watch-channel"><code>watch</code> channel</a></h3>
|
||
<p>The <a href="watch/index.html"><code>watch</code> channel</a> supports sending <strong>many</strong> values from a <strong>single</strong>
|
||
producer to <strong>many</strong> consumers. However, only the <strong>most recent</strong> value is
|
||
stored in the channel. Consumers are notified when a new value is sent, but
|
||
there is no guarantee that consumers will see <strong>all</strong> values.</p>
|
||
<p>The <a href="watch/index.html"><code>watch</code> channel</a> is similar to a <a href="broadcast/index.html"><code>broadcast</code> channel</a> with capacity 1.</p>
|
||
<p>Use cases for the <a href="watch/index.html"><code>watch</code> channel</a> include broadcasting configuration
|
||
changes or signalling program state changes, such as transitioning to
|
||
shutdown.</p>
|
||
<p><strong>Example:</strong> use a <a href="watch/index.html"><code>watch</code> channel</a> to notify tasks of configuration
|
||
changes. In this example, a configuration file is checked periodically. When
|
||
the file changes, the configuration changes are signalled to consumers.</p>
|
||
|
||
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use</span> <span class="ident">tokio::sync::watch</span>;
|
||
<span class="kw">use</span> <span class="ident">tokio::time</span>::{<span class="self">self</span>, <span class="ident">Duration</span>, <span class="ident">Instant</span>};
|
||
|
||
<span class="kw">use</span> <span class="ident">std::io</span>;
|
||
|
||
<span class="attribute">#[<span class="ident">derive</span>(<span class="ident">Debug</span>, <span class="ident">Clone</span>, <span class="ident">Eq</span>, <span class="ident">PartialEq</span>)]</span>
|
||
<span class="kw">struct</span> <span class="ident">Config</span> {
|
||
<span class="ident">timeout</span>: <span class="ident">Duration</span>,
|
||
}
|
||
|
||
<span class="kw">impl</span> <span class="ident">Config</span> {
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">load_from_file</span>() -> <span class="ident">io::Result</span><span class="op"><</span><span class="ident">Config</span><span class="op">></span> {
|
||
<span class="comment">// file loading and deserialization logic here</span>
|
||
}
|
||
}
|
||
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">my_async_operation</span>() {
|
||
<span class="comment">// Do something here</span>
|
||
}
|
||
|
||
<span class="attribute">#[<span class="ident">tokio::main</span>]</span>
|
||
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
|
||
<span class="comment">// Load initial configuration value</span>
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">config</span> <span class="op">=</span> <span class="ident">Config::load_from_file</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
|
||
<span class="comment">// Create the watch channel, initialized with the loaded configuration</span>
|
||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">watch::channel</span>(<span class="ident">config</span>.<span class="ident">clone</span>());
|
||
|
||
<span class="comment">// Spawn a task to monitor the file.</span>
|
||
<span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="kw">loop</span> {
|
||
<span class="comment">// Wait 10 seconds between checks</span>
|
||
<span class="ident">time::sleep</span>(<span class="ident">Duration::from_secs</span>(<span class="number">10</span>)).<span class="kw">await</span>;
|
||
|
||
<span class="comment">// Load the configuration file</span>
|
||
<span class="kw">let</span> <span class="ident">new_config</span> <span class="op">=</span> <span class="ident">Config::load_from_file</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
|
||
<span class="comment">// If the configuration changed, send the new config value</span>
|
||
<span class="comment">// on the watch channel.</span>
|
||
<span class="kw">if</span> <span class="ident">new_config</span> <span class="op">!</span><span class="op">=</span> <span class="ident">config</span> {
|
||
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">new_config</span>.<span class="ident">clone</span>()).<span class="ident">unwrap</span>();
|
||
<span class="ident">config</span> <span class="op">=</span> <span class="ident">new_config</span>;
|
||
}
|
||
}
|
||
});
|
||
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">handles</span> <span class="op">=</span> <span class="macro">vec!</span>[];
|
||
|
||
<span class="comment">// Spawn tasks that runs the async operation for at most `timeout`. If</span>
|
||
<span class="comment">// the timeout elapses, restart the operation.</span>
|
||
<span class="comment">//</span>
|
||
<span class="comment">// The task simultaneously watches the `Config` for changes. When the</span>
|
||
<span class="comment">// timeout duration changes, the timeout is updated without restarting</span>
|
||
<span class="comment">// the in-flight operation.</span>
|
||
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">5</span> {
|
||
<span class="comment">// Clone a config watch handle for use in this task</span>
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">rx</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">clone</span>();
|
||
|
||
<span class="kw">let</span> <span class="ident">handle</span> <span class="op">=</span> <span class="ident">tokio::spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
|
||
<span class="comment">// Start the initial operation and pin the future to the stack.</span>
|
||
<span class="comment">// Pinning to the stack is required to resume the operation</span>
|
||
<span class="comment">// across multiple calls to `select!`</span>
|
||
<span class="kw">let</span> <span class="ident">op</span> <span class="op">=</span> <span class="ident">my_async_operation</span>();
|
||
<span class="macro">tokio::pin!</span>(<span class="ident">op</span>);
|
||
|
||
<span class="comment">// Get the initial config value</span>
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">conf</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">borrow</span>().<span class="ident">clone</span>();
|
||
|
||
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">op_start</span> <span class="op">=</span> <span class="ident">Instant::now</span>();
|
||
<span class="kw">let</span> <span class="ident">sleep</span> <span class="op">=</span> <span class="ident">time::sleep_until</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>);
|
||
<span class="macro">tokio::pin!</span>(<span class="ident">sleep</span>);
|
||
|
||
<span class="kw">loop</span> {
|
||
<span class="macro">tokio::select!</span> {
|
||
<span class="kw">_</span> <span class="op">=</span> <span class="kw-2">&mut</span> <span class="ident">sleep</span> => {
|
||
<span class="comment">// The operation elapsed. Restart it</span>
|
||
<span class="ident">op</span>.<span class="ident">set</span>(<span class="ident">my_async_operation</span>());
|
||
|
||
<span class="comment">// Track the new start time</span>
|
||
<span class="ident">op_start</span> <span class="op">=</span> <span class="ident">Instant::now</span>();
|
||
|
||
<span class="comment">// Restart the timeout</span>
|
||
<span class="ident">sleep</span>.<span class="ident">set</span>(<span class="ident">time::sleep_until</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>));
|
||
}
|
||
<span class="kw">_</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">changed</span>() => {
|
||
<span class="ident">conf</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">borrow</span>().<span class="ident">clone</span>();
|
||
|
||
<span class="comment">// The configuration has been updated. Update the</span>
|
||
<span class="comment">// `sleep` using the new `timeout` value.</span>
|
||
<span class="ident">sleep</span>.<span class="ident">as_mut</span>().<span class="ident">reset</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>);
|
||
}
|
||
<span class="kw">_</span> <span class="op">=</span> <span class="kw-2">&mut</span> <span class="ident">op</span> => {
|
||
<span class="comment">// The operation completed!</span>
|
||
<span class="kw">return</span>
|
||
}
|
||
}
|
||
}
|
||
});
|
||
|
||
<span class="ident">handles</span>.<span class="ident">push</span>(<span class="ident">handle</span>);
|
||
}
|
||
|
||
<span class="kw">for</span> <span class="ident">handle</span> <span class="kw">in</span> <span class="ident">handles</span>.<span class="ident">drain</span>(..) {
|
||
<span class="ident">handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
|
||
}
|
||
}</code></pre></div>
|
||
<h2 id="state-synchronization" class="section-header"><a href="#state-synchronization">State synchronization</a></h2>
|
||
<p>The remaining synchronization primitives focus on synchronizing state.
|
||
These are asynchronous equivalents to versions provided by <code>std</code>. They
|
||
operate in a similar way as their <code>std</code> counterparts but will wait
|
||
asynchronously instead of blocking the thread.</p>
|
||
<ul>
|
||
<li>
|
||
<p><a href="struct.Barrier.html"><code>Barrier</code></a> Ensures multiple tasks will wait for each other to
|
||
reach a point in the program, before continuing execution all together.</p>
|
||
</li>
|
||
<li>
|
||
<p><a href="struct.Mutex.html"><code>Mutex</code></a> Mutual Exclusion mechanism, which ensures that at most
|
||
one thread at a time is able to access some data.</p>
|
||
</li>
|
||
<li>
|
||
<p><a href="struct.Notify.html"><code>Notify</code></a> Basic task notification. <code>Notify</code> supports notifying a
|
||
receiving task without sending data. In this case, the task wakes up and
|
||
resumes processing.</p>
|
||
</li>
|
||
<li>
|
||
<p><a href="struct.RwLock.html"><code>RwLock</code></a> Provides a mutual exclusion mechanism which allows
|
||
multiple readers at the same time, while allowing only one writer at a
|
||
time. In some cases, this can be more efficient than a mutex.</p>
|
||
</li>
|
||
<li>
|
||
<p><a href="struct.Semaphore.html"><code>Semaphore</code></a> Limits the amount of concurrency. A semaphore
|
||
holds a number of permits, which tasks may request in order to enter a
|
||
critical section. Semaphores are useful for implementing limiting or
|
||
bounding of any kind.</p>
|
||
</li>
|
||
</ul>
|
||
</div></details><h2 id="modules" class="small-section-header"><a href="#modules">Modules</a></h2>
|
||
<div class="item-table"><div class="item-row"><div class="item-left module-item"><a class="mod" href="broadcast/index.html" title="tokio::sync::broadcast mod">broadcast</a></div><div class="item-right docblock-short"><p>A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
|
||
all consumers.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="mod" href="futures/index.html" title="tokio::sync::futures mod">futures</a></div><div class="item-right docblock-short"><p>Named future types.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="mod" href="mpsc/index.html" title="tokio::sync::mpsc mod">mpsc</a></div><div class="item-right docblock-short"><p>A multi-producer, single-consumer queue for sending values between
|
||
asynchronous tasks.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="mod" href="oneshot/index.html" title="tokio::sync::oneshot mod">oneshot</a></div><div class="item-right docblock-short"><p>A one-shot channel is used for sending a single message between
|
||
asynchronous tasks. The <a href="oneshot/fn.channel.html" title="channel"><code>channel</code></a> function is used to create a
|
||
<a href="oneshot/struct.Sender.html" title="Sender"><code>Sender</code></a> and <a href="oneshot/struct.Receiver.html" title="Receiver"><code>Receiver</code></a> handle pair that form the channel.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="mod" href="watch/index.html" title="tokio::sync::watch mod">watch</a></div><div class="item-right docblock-short"><p>A single-producer, multi-consumer channel that only retains the <em>last</em> sent
|
||
value.</p>
|
||
</div></div></div><h2 id="structs" class="small-section-header"><a href="#structs">Structs</a></h2>
|
||
<div class="item-table"><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.AcquireError.html" title="tokio::sync::AcquireError struct">AcquireError</a></div><div class="item-right docblock-short"><p>Error returned from the <a href="struct.Semaphore.html#method.acquire"><code>Semaphore::acquire</code></a> function.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.Barrier.html" title="tokio::sync::Barrier struct">Barrier</a></div><div class="item-right docblock-short"><p>A barrier enables multiple tasks to synchronize the beginning of some computation.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.BarrierWaitResult.html" title="tokio::sync::BarrierWaitResult struct">BarrierWaitResult</a></div><div class="item-right docblock-short"><p>A <code>BarrierWaitResult</code> is returned by <code>wait</code> when all tasks in the <code>Barrier</code> have rendezvoused.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.MappedMutexGuard.html" title="tokio::sync::MappedMutexGuard struct">MappedMutexGuard</a></div><div class="item-right docblock-short"><p>A handle to a held <code>Mutex</code> that has had a function applied to it via <a href="struct.MutexGuard.html#method.map"><code>MutexGuard::map</code></a>.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.Mutex.html" title="tokio::sync::Mutex struct">Mutex</a></div><div class="item-right docblock-short"><p>An asynchronous <code>Mutex</code>-like type.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.MutexGuard.html" title="tokio::sync::MutexGuard struct">MutexGuard</a></div><div class="item-right docblock-short"><p>A handle to a held <code>Mutex</code>. The guard can be held across any <code>.await</code> point
|
||
as it is <a href="https://doc.rust-lang.org/1.59.0/core/marker/trait.Send.html" title="Send"><code>Send</code></a>.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.Notify.html" title="tokio::sync::Notify struct">Notify</a></div><div class="item-right docblock-short"><p>Notifies a single task to wake up.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OnceCell.html" title="tokio::sync::OnceCell struct">OnceCell</a></div><div class="item-right docblock-short"><p>A thread-safe cell that can be written to only once.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OwnedMutexGuard.html" title="tokio::sync::OwnedMutexGuard struct">OwnedMutexGuard</a></div><div class="item-right docblock-short"><p>An owned handle to a held <code>Mutex</code>.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OwnedRwLockMappedWriteGuard.html" title="tokio::sync::OwnedRwLockMappedWriteGuard struct">OwnedRwLockMappedWriteGuard</a></div><div class="item-right docblock-short"><p>Owned RAII structure used to release the exclusive write access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OwnedRwLockReadGuard.html" title="tokio::sync::OwnedRwLockReadGuard struct">OwnedRwLockReadGuard</a></div><div class="item-right docblock-short"><p>Owned RAII structure used to release the shared read access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OwnedRwLockWriteGuard.html" title="tokio::sync::OwnedRwLockWriteGuard struct">OwnedRwLockWriteGuard</a></div><div class="item-right docblock-short"><p>Owned RAII structure used to release the exclusive write access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.OwnedSemaphorePermit.html" title="tokio::sync::OwnedSemaphorePermit struct">OwnedSemaphorePermit</a></div><div class="item-right docblock-short"><p>An owned permit from the semaphore.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.RwLock.html" title="tokio::sync::RwLock struct">RwLock</a></div><div class="item-right docblock-short"><p>An asynchronous reader-writer lock.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.RwLockMappedWriteGuard.html" title="tokio::sync::RwLockMappedWriteGuard struct">RwLockMappedWriteGuard</a></div><div class="item-right docblock-short"><p>RAII structure used to release the exclusive write access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.RwLockReadGuard.html" title="tokio::sync::RwLockReadGuard struct">RwLockReadGuard</a></div><div class="item-right docblock-short"><p>RAII structure used to release the shared read access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.RwLockWriteGuard.html" title="tokio::sync::RwLockWriteGuard struct">RwLockWriteGuard</a></div><div class="item-right docblock-short"><p>RAII structure used to release the exclusive write access of a lock when
|
||
dropped.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.Semaphore.html" title="tokio::sync::Semaphore struct">Semaphore</a></div><div class="item-right docblock-short"><p>Counting semaphore performing asynchronous permit acquisition.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.SemaphorePermit.html" title="tokio::sync::SemaphorePermit struct">SemaphorePermit</a></div><div class="item-right docblock-short"><p>A permit from the semaphore.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="struct" href="struct.TryLockError.html" title="tokio::sync::TryLockError struct">TryLockError</a></div><div class="item-right docblock-short"><p>Error returned from the <a href="struct.Mutex.html#method.try_lock"><code>Mutex::try_lock</code></a>, <a href="struct.RwLock.html#method.try_read"><code>RwLock::try_read</code></a> and
|
||
<a href="struct.RwLock.html#method.try_write"><code>RwLock::try_write</code></a> functions.</p>
|
||
</div></div></div><h2 id="enums" class="small-section-header"><a href="#enums">Enums</a></h2>
|
||
<div class="item-table"><div class="item-row"><div class="item-left module-item"><a class="enum" href="enum.SetError.html" title="tokio::sync::SetError enum">SetError</a></div><div class="item-right docblock-short"><p>Errors that can be returned from <a href="struct.OnceCell.html#method.set"><code>OnceCell::set</code></a>.</p>
|
||
</div></div><div class="item-row"><div class="item-left module-item"><a class="enum" href="enum.TryAcquireError.html" title="tokio::sync::TryAcquireError enum">TryAcquireError</a></div><div class="item-right docblock-short"><p>Error returned from the <a href="struct.Semaphore.html#method.try_acquire"><code>Semaphore::try_acquire</code></a> function.</p>
|
||
</div></div></div></section><section id="search" class="content hidden"></section></div></main><div id="rustdoc-vars" data-root-path="../../" data-current-crate="tokio" data-themes="ayu,dark,light" data-resource-suffix="" data-rustdoc-version="1.59.0 (9d1b2106e 2022-02-23)" ></div>
|
||
</body></html> |