Skip to content

Commit

Permalink
rust-toolchain: upgrade to Rust 1.40.0
Browse files Browse the repository at this point in the history
Option::as_deref was stablized in Rust 1.40, so we can drop our version
of it, mz_as_deref, from ore::option::OptionExt.

Clippy now caches warnings, so we no longer need to clean before running
`cargo clippy`.

The other changes in this PR are to comply with Clippy's new lints,
which are mostly about avoiding unnecessary calls to `.clone()`.
  • Loading branch information
benesch committed Dec 20, 2019
1 parent a9b5946 commit 5e7873c
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 184 deletions.
69 changes: 6 additions & 63 deletions bin/check
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,6 @@ set -euo pipefail
cd "$(dirname "$0")/.."
source misc/shlib/shlib.bash

usage() {
# shellcheck disable=SC2006
die "usage: $0 [`uf --[no-]force-build`]
Run cargo clippy with our CI flags
Args:
Both of these override the 'MZ_CHECK_FORCE_BUILD=[yn]' environment var:
`uf -f --force-build` Force clippy to build every project, not just changed ones (default)
`uf -n --no-force-build` Run clippy on just files that have been changed since the last
time it was run (do not force a full rebuild)"
}

main() {
set_args "$@"
run_clippy
}

# Args
MZ_CHECK_FORCE_BUILD=${MZ_CHECK_FORCE_BUILD:-y}
set_args() {
while [[ $# -gt 0 ]]; do
case $1 in
-n|--no-force-build)
MZ_CHECK_FORCE_BUILD=n
shift
;;
-f|--force-build)
MZ_CHECK_FORCE_BUILD=y
shift
;;
*) usage ;;
esac
done
}

# Add lints to this list freely, but please add a comment with justification
# along with the lint. The goal is to ever-so-slightly increase the barrier to
# turning off a lint.
Expand Down Expand Up @@ -196,28 +158,9 @@ extra_lints=(
# missing_debug_implementations
)

run_clippy() {
# NOTE(benesch): we ignore some ShellCheck complaints about sloppy word
# splitting below. It's substantially clearer than doing things "properly,"
# and the inputs to this script are trusted.

# shellcheck disable=SC2086
if [[ $MZ_CHECK_FORCE_BUILD == y ]]; then
pkgspec=$(sed -nE 's,.*"src/([^"]+)".*,src/\1,p' Cargo.toml)
echo -n "run> touch"
for pkg in $pkgspec; do
for fname in lib.rs main.rs; do
path="$pkg/$fname"
if [[ -f "$path" ]]; then
touch "$path"
echo -n " $path"
fi
done
done
echo
fi
# shellcheck disable=SC2046
runv cargo clippy -- -D warnings $(printf -- "-D %s " "${extra_lints[@]}") $(printf -- "-A %s " "${disabled_lints[@]}")
}

main "$@"
# NOTE(benesch): we ignore some ShellCheck complaints about sloppy word
# splitting below. It's substantially clearer than doing things "properly,"
# and the inputs to this script are trusted.

# shellcheck disable=SC2046
run cargo clippy -- -D warnings $(printf -- "-D %s " "${extra_lints[@]}") $(printf -- "-A %s " "${disabled_lints[@]}")
2 changes: 1 addition & 1 deletion ci/slt/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ steps:
timeout_in_minutes: 10
plugins:
- docker#v3.1.0:
image: materialize/ci-builder:1.39.0-20191111-160627
image: materialize/ci-builder:1.40.0-20191219-232317
propagate-uid-gid: true
mount-ssh-agent: true
environment:
Expand Down
2 changes: 1 addition & 1 deletion ci/test/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker_run() {
--env OPENSSL_LIB_DIR=/usr/lib/x86_64-linux-gnu \
--env OPENSSL_INCLUDE_DIR=/usr/include \
--user "$(id -u):$(id -g)" \
materialize/ci-builder:1.39.0-20191111-160627 bash -c "$1"
materialize/ci-builder:1.40.0-20191219-232317 bash -c "$1"
}

ci_init
Expand Down
4 changes: 2 additions & 2 deletions ci/test/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ steps:
timeout_in_minutes: 10
plugins:
- docker#v3.1.0:
image: materialize/ci-builder:1.39.0-20191111-160627
image: materialize/ci-builder:1.40.0-20191219-232317
propagate-uid-gid: true
mount-ssh-agent: true
volumes:
Expand All @@ -35,7 +35,7 @@ steps:
timeout_in_minutes: 30
plugins:
- docker#v3.1.0:
image: materialize/ci-builder:1.39.0-20191111-160627
image: materialize/ci-builder:1.40.0-20191219-232317
propagate-uid-gid: true
mount-ssh-agent: true
volumes:
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.39.0
1.40.0
27 changes: 15 additions & 12 deletions src/comm/switchboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

//! Traffic routing.
use std::cmp::Ordering;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -156,28 +157,30 @@ where
Pin<Box<dyn Future<Output = Result<Option<C>, io::Error>> + Send>>,
>::new();
for (i, addr) in self.0.nodes.iter().enumerate() {
if i < self.0.id {
match i.cmp(&self.0.id) {
// Earlier node. Wait for it to connect to us.
futures.push(Box::pin(
Ordering::Less => futures.push(Box::pin(
self.0
.rendezvous_table
.lock()
.expect("lock poisoned")
.add_dest(i as u64)
.into_future()
.map(|(conn, _stream)| Ok(conn)),
));
} else if i == self.0.id {
)),

// Ourselves. Nothing to do.
futures.push(Box::pin(future::ok(None)));
} else {
Ordering::Equal => futures.push(Box::pin(future::ok(None))),

// Later node. Attempt to initiate connection.
let id = self.0.id as u64;
futures.push(Box::pin(
TryConnectFuture::new(addr.clone(), timeout)
.and_then(move |conn| protocol::send_rendezvous_handshake(conn, id))
.map_ok(|conn| Some(conn)),
));
Ordering::Greater => {
let id = self.0.id as u64;
futures.push(Box::pin(
TryConnectFuture::new(addr.clone(), timeout)
.and_then(move |conn| protocol::send_rendezvous_handshake(conn, id))
.map_ok(|conn| Some(conn)),
));
}
}
}
futures.try_collect().await
Expand Down
5 changes: 2 additions & 3 deletions src/coord/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use dataflow_types::{
};
use expr::{EvalEnv, GlobalId, Id, IdHumanizer, RelationExpr, ScalarExpr};
use ore::collections::CollectionExt;
use ore::option::OptionExt;
use repr::{ColumnName, Datum, QualName, RelationDesc, Row};
use sql::{MutationKind, ObjectType, Plan, Session};
use sql::{Params, PreparedStatement};
Expand Down Expand Up @@ -113,7 +112,7 @@ where
None
};

let catalog_path = catalog_path.mz_as_deref();
let catalog_path = catalog_path.as_deref();
let catalog = if let Some(logging_config) = config.logging {
Catalog::open(
catalog_path,
Expand Down Expand Up @@ -346,7 +345,7 @@ where
name,
Source {
connector: SourceConnector::Local,
desc: desc.clone(),
desc,
},
)])?;
ExecuteResponse::CreatedTable
Expand Down
38 changes: 20 additions & 18 deletions src/dataflow/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,38 @@
// This file is part of Materialize. Materialize may not be used or
// distributed without the express permission of Materialize, Inc.

use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
use std::rc::Rc;

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::operators::arrange::arrangement::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::AsCollection;
use std::any::Any;
use std::collections::HashMap;
use std::collections::HashSet;
use std::rc::Rc;
use timely::communication::Allocate;
use timely::dataflow::operators::unordered_input::UnorderedInput;
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::worker::Worker as TimelyWorker;
use tokio;

use dataflow_types::*;
use expr::{EvalEnv, GlobalId, Id, RelationExpr};
use repr::{Datum, Row, RowPacker, RowUnpacker};

use self::context::{ArrangementFlavor, Context};
use super::sink;
use super::source;
use super::source::FileReadStyle;
use crate::arrangement::manager::{KeysValsSpine, TraceManager, WithDrop};
use crate::decode::decode;
use crate::logging::materialized::{Logger, MaterializedEvent};
use crate::server::LocalInput;

mod context;
use crate::decode::decode;
use context::{ArrangementFlavor, Context};

pub(crate) fn build_dataflow<A: Allocate>(
dataflow: DataflowDesc,
Expand Down Expand Up @@ -607,16 +608,17 @@ where
for equivalence in variables.iter() {
// Keep columns that are needed for future joins
if equivalence.last().unwrap().0 > index {
if equivalence[0].0 < index {
old_outputs.push(
match equivalence[0].0.cmp(&index) {
Ordering::Less => old_outputs.push(
columns.iter().position(|c2| equivalence[0] == *c2).unwrap(),
);
} else if equivalence[0].0 == index {
new_outputs.push(equivalence[0].1);
),
Ordering::Equal => new_outputs.push(equivalence[0].1),
Ordering::Greater => {
// If the relation exceeds the current index,
// we don't need to worry about retaining it
// at this moment.
}
}
// If the relation exceeds the current index,
// we don't need to worry about retaining it
// at this moment.
}

// If a key exists in `joined`
Expand Down Expand Up @@ -1048,7 +1050,7 @@ where
);

let index = (0..keys_clone.len()).collect::<Vec<_>>();
self.set_local_columns(relation_expr, &index[..], arrangement.clone());
self.set_local_columns(relation_expr, &index[..], arrangement);
}
}

Expand Down Expand Up @@ -1142,7 +1144,7 @@ where
});

let index = (0..group_key.len()).collect::<Vec<_>>();
self.set_local_columns(relation_expr, &index[..], arrangement.clone());
self.set_local_columns(relation_expr, &index[..], arrangement);
}
}

Expand Down Expand Up @@ -1201,7 +1203,7 @@ where
};

let index = (0..keys.len()).collect::<Vec<_>>();
self.set_local_columns(relation_expr, &index[..], arranged.clone());
self.set_local_columns(relation_expr, &index[..], arranged);
}
}
}
1 change: 0 additions & 1 deletion src/expr/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,6 @@ impl RelationExpr {
keys_and_values
.let_in(id_gen, |_id_gen, get_keys_and_values| {
Ok(get_keys_and_values
.clone()
.distinct_by((0..keys.arity()).collect())
.negate()
.union(keys)
Expand Down
2 changes: 1 addition & 1 deletion src/expr/transform/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Unbind {
{
unbind_all(&mut value, env);

let local = env.bind_local(id, *value.clone());
let local = env.bind_local(id, *value);
*expr = *body;
unbind_all(expr, local.env);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/expr/transform/nonnull_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl NonNullRequirements {
columns.iter().all(|c| datums[*c] != repr::Datum::Null)
}),
RelationExpr::Get { id, .. } => {
gets.entry(*id).or_insert(Vec::new()).push(columns.clone());
gets.entry(*id).or_insert(Vec::new()).push(columns);
}
RelationExpr::Let { id, value, body } => {
// Let harvests any non-null requirements from its body,
Expand Down
5 changes: 1 addition & 4 deletions src/expr/transform/reduce_elision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ impl ReduceElision {
Datum::Int64(0),
column_type.clone().nullable(false),
),
ScalarExpr::literal(
Datum::Int64(1),
column_type.clone().nullable(false),
),
ScalarExpr::literal(Datum::Int64(1), column_type.nullable(false)),
)
}
// CountAll is one no matter what the input.
Expand Down
2 changes: 1 addition & 1 deletion src/expr/transform/simplify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl SimplifyFilterPredicates {
input: Box::from(relation.take_dangerous()),
outputs: columns_to_keep,
};
mem::replace(relation, full_new_relation.clone());
mem::replace(relation, full_new_relation);
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/materialized/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use dataflow_types::logging::LoggingConfig;
use ore::future::OreTryFutureExt;
use ore::netio;
use ore::netio::{SniffedStream, SniffingStream};
use ore::option::OptionExt;
use ore::thread::{JoinHandleExt, JoinOnDropHandle};
use ore::tokio::net::TcpStreamExt;

Expand Down Expand Up @@ -235,10 +234,10 @@ pub fn serve(mut config: Config) -> Result<Server, failure::Error> {
let mut coord = coord::Coordinator::new(coord::Config {
switchboard: switchboard.clone(),
num_timely_workers,
symbiosis_url: config.symbiosis_url.mz_as_deref(),
symbiosis_url: config.symbiosis_url.as_deref(),
logging: logging_config.as_ref(),
bootstrap_sql: config.bootstrap_sql,
data_directory: config.data_directory.mz_as_deref(),
data_directory: config.data_directory.as_deref(),
executor: &executor,
})?;
Some(thread::spawn(move || coord.serve(cmd_rx)).join_on_drop())
Expand All @@ -253,7 +252,7 @@ pub fn serve(mut config: Config) -> Result<Server, failure::Error> {
config.process,
switchboard,
executor,
logging_config.clone(),
logging_config,
)
.map_err(|s| format_err!("{}", s))?;

Expand Down
16 changes: 3 additions & 13 deletions src/ore/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,9 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// First, drain the incoming stream queue.
loop {
match self.incoming_streams.poll_next_unpin(cx) {
Poll::Ready(Some(stream)) => {
// New stream available. Add it to the set of active
// streams. Then look for more incoming streams.
self.active_streams.push(stream.into_future())
}
Poll::Ready(None) | Poll::Pending => {
// The incoming stream queue is drained, at least for now.
// Move on to checking for ready items.
break;
}
}
while let Poll::Ready(Some(stream)) = self.incoming_streams.poll_next_unpin(cx) {
// New stream available. Add it to the set of active streams.
self.active_streams.push(stream.into_future())
}

// Second, try to find an item from a ready stream.
Expand Down
Loading

0 comments on commit 5e7873c

Please sign in to comment.