Skip to content

Commit 96bf20f

Browse files
committed
A good starting point
1 parent 18059c9 commit 96bf20f

34 files changed

+1872
-1
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*.exe
99

1010
# Generated by Cargo
11-
/target/
11+
target/
1212

1313
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
1414
# More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock

TODO.md

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
### Database
2+
3+
- Use & Improve Config (Done)
4+
- Error Handling (Done)
5+
- Basic Authentication (Done)
6+
- Validation (Done)
7+
- Investigate If Tags Should Be Mandatory (Done)
8+
- Return Event ID On Publish (Done)
9+
- Consider Own Trait Over ToString/FromString Traits (Unnecessary)
10+
11+
### Config
12+
13+
- Use A Separate Config Struct For Each Components (Done)
14+
15+
### Collection
16+
17+
- Use & Improve Config (Done)
18+
- Use Subscription Strategy In Config (Done)
19+
- Error Handling (Done)
20+
- Improve Subscribe Method (Done)
21+
- Subscription Routing Strategy (Done)
22+
- Support Collection-Specific Config (Done)
23+
- Improve File Reader & File Writer Methods (Done)
24+
25+
### Connection
26+
27+
- Error Handling (Done)
28+
- Add Close Method (Done)
29+
30+
### Event
31+
32+
- Add Timestamp (Done)
33+
- Improve Serialization If Necessary (Done)
34+
- Method To Set ID And Timestamp Should Be Private (Unnecessary)
35+
36+
### Subscription
37+
38+
- Error Handling (Done)
39+
- Improve API (Done)
40+
- Add Current Events Queries (Done)
41+
42+
### Writer
43+
44+
- Error Handling (Done)
45+
- Change API To Disallow Event ID Changes If Necessary (Unnecessary)
46+
47+
### Scanner
48+
49+
- Use Config (Unnecessary)
50+
- Error Handling (Done)
51+
52+
53+
### Net
54+
55+
- Move/Refactor Utility Functions (Done)
56+
57+
### Protocol
58+
59+
- Improve Event Stream (Done)
60+
61+
### Server
62+
63+
- Error Handling (Done)
64+
- Use Config (Done)
65+
- Improve Constructor & API (Done)
66+
67+
### Handler
68+
69+
- Error Handling (Done)
70+
- Improve Commands/Results Serialization (Done)
71+
- Move Protocol Into Its Own Module (Done)
72+
73+
### Client
74+
75+
- Error Handling (Done)
76+
- Improve Commands/Results Serialization (Done)
77+
- Add Close Method (Done)
78+
79+
### General
80+
81+
- Rename Components If Necessary (-)
82+
- Derive Relevant Traits For All Structs/Enums (-)
83+
- Implement Display For Relevant Structs/Enums (-)
84+
- Write Tests (-)
85+
- Write Docs (-)
86+
- Split Core/Server/Client Into Separate Crates (-)

exar-client/Cargo.toml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "exar-client"
3+
version = "0.1.0"
4+
authors = ["Bruno Filippone <[email protected]>"]
5+
6+
[dependencies]
7+
exar = { version = "0.1", path = "../exar-core" }
8+
exar-net = { version = "0.1", path = "../exar-net" }

exar-client/src/lib.rs

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
extern crate exar;
2+
extern crate exar_net;
3+
4+
use exar::*;
5+
use exar_net::*;
6+
7+
use std::io::{Error as IoError, ErrorKind};
8+
use std::net::TcpStream;
9+
use std::sync::mpsc::channel;
10+
use std::thread;
11+
12+
pub struct Client {
13+
stream: Stream
14+
}
15+
16+
impl Client {
17+
pub fn connect(address: &str, collection_name: &str, username: Option<&str>, password: Option<&str>) -> Result<Client, DatabaseError> {
18+
match TcpStream::connect(address) {
19+
Ok(stream) => {
20+
let mut stream = try!(Stream::new(stream));
21+
let username = username.map(|u| u.to_owned());
22+
let password = password.map(|p| p.to_owned());
23+
let connection_message = TcpMessage::Connect(collection_name.to_owned(), username, password);
24+
try!(stream.send_message(connection_message));
25+
match stream.receive_message() {
26+
Ok(TcpMessage::Connected) => Ok(Client { stream: stream }),
27+
Ok(TcpMessage::Error(error)) => Err(error),
28+
Ok(_) => Err(DatabaseError::ConnectionError),
29+
Err(err) => Err(err)
30+
}
31+
},
32+
Err(err) => Err(DatabaseError::IoError(err))
33+
}
34+
}
35+
36+
pub fn publish(&mut self, event: Event) -> Result<usize, DatabaseError> {
37+
try!(self.stream.send_message(TcpMessage::Publish(event)));
38+
match self.stream.receive_message() {
39+
Ok(TcpMessage::Published(event_id)) => Ok(event_id),
40+
Ok(TcpMessage::Error(error)) => Err(error),
41+
Ok(_) => Err(DatabaseError::IoError(IoError::new(ErrorKind::InvalidData, UnexpectedTcpMessage))),
42+
Err(err) => Err(err)
43+
}
44+
}
45+
46+
pub fn subscribe(&mut self, query: Query) -> Result<EventStream, DatabaseError> {
47+
let subscribe_message = TcpMessage::Subscribe(query.live, query.offset, query.limit, query.tag);
48+
self.stream.send_message(subscribe_message).and_then(|_| {
49+
let (send, recv) = channel();
50+
self.stream.try_clone().and_then(|cloned_stream| {
51+
thread::spawn(move || {
52+
for message in cloned_stream.messages() {
53+
match message {
54+
Ok(TcpMessage::Event(event)) => match send.send(event) {
55+
Ok(_) => continue,
56+
Err(err) => println!("Unable to send event to the event stream: {}", err)
57+
},
58+
Ok(TcpMessage::EndOfStream) => (),
59+
Ok(TcpMessage::Error(error)) => println!("Received error from TCP stream: {:?}", error),
60+
Ok(message) => println!("Unexpected TCP message: {:?}", message),
61+
Err(err) => println!("Unable to read TCP message from stream: {:?}", err)
62+
};
63+
break
64+
}
65+
});
66+
Ok(EventStream::new(recv))
67+
})
68+
})
69+
}
70+
71+
pub fn close(self) {
72+
drop(self)
73+
}
74+
}

exar-core/Cargo.toml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "exar"
3+
version = "0.1.0"
4+
authors = ["Bruno Filippone <[email protected]>"]
5+
6+
[dependencies]
7+
rand = "0.3"
8+
time = "0.1"
9+
rustc-serialize = { optional = true, version = "0.3" }
10+
serde = { optional = true, version = "0.6" }
11+
serde_macros = { optional = true, version = "0.6" }
12+
13+
[features]
14+
default = ["rustc-serialize"]
15+
serde-serialization = ["serde", "serde_macros"]

exar-core/src/collection.rs

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use super::*;
2+
3+
use rand;
4+
use rand::Rng;
5+
use std::sync::mpsc::channel;
6+
7+
pub struct Collection {
8+
log: Log,
9+
scanners: Vec<Scanner>,
10+
routing_strategy: RoutingStrategy,
11+
writer: Writer
12+
}
13+
14+
impl Collection {
15+
pub fn new(collection_name: &str, config: CollectionConfig) -> Result<Collection, DatabaseError> {
16+
let log = Log::new(&config.logs_path, collection_name);
17+
Writer::new(log.clone()).and_then(|writer| {
18+
let mut scanners = vec![];
19+
for _ in 0..config.num_scanners {
20+
let scanner = try!(Scanner::new(log.clone()));
21+
scanners.push(scanner);
22+
}
23+
Ok(Collection {
24+
log: log,
25+
scanners: scanners,
26+
routing_strategy: config.routing_strategy.clone(),
27+
writer: writer
28+
})
29+
})
30+
}
31+
32+
pub fn publish(&self, event: Event) -> Result<usize, DatabaseError> {
33+
self.writer.store(event)
34+
}
35+
36+
pub fn subscribe(&mut self, query: Query) -> Result<EventStream, DatabaseError> {
37+
let (send, recv) = channel();
38+
self.apply_strategy(Subscription::new(send, query)).and_then(|updated_strategy| {
39+
self.routing_strategy = updated_strategy;
40+
Ok(EventStream::new(recv))
41+
})
42+
}
43+
44+
pub fn drop(&self) -> Result<(), DatabaseError> {
45+
self.drop_scanners();
46+
match self.log.remove() {
47+
Ok(()) => Ok(()),
48+
Err(err) => Err(DatabaseError::IoError(err))
49+
}
50+
}
51+
52+
fn drop_scanners(&self) {
53+
for scanner in &self.scanners {
54+
drop(scanner);
55+
}
56+
}
57+
58+
fn apply_strategy(&mut self, subscription: Subscription) -> Result<RoutingStrategy, DatabaseError> {
59+
match self.routing_strategy {
60+
RoutingStrategy::Random => match rand::thread_rng().choose(&self.scanners) {
61+
Some(random_scanner) => {
62+
random_scanner.handle_subscription(subscription).and_then(|_| {
63+
Ok(RoutingStrategy::Random)
64+
})
65+
},
66+
None => Err(DatabaseError::SubscriptionError)
67+
},
68+
RoutingStrategy::RoundRobin(ref last_index) => {
69+
let mut new_index = 0;
70+
if last_index + 1 < self.scanners.len() {
71+
new_index = last_index + 1;
72+
}
73+
match self.scanners.get(new_index) {
74+
Some(scanner) => scanner.handle_subscription(subscription).and_then(|_| {
75+
Ok(RoutingStrategy::RoundRobin(new_index))
76+
}),
77+
None => Err(DatabaseError::SubscriptionError)
78+
}
79+
}
80+
}
81+
}
82+
}

exar-core/src/config.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use super::*;
2+
3+
use std::collections::BTreeMap;
4+
5+
#[cfg_attr(feature = "rustc-serialize", derive(RustcEncodable, RustcDecodable))]
6+
#[cfg_attr(feature = "serde-serialization", derive(Serialize, Deserialize))]
7+
#[derive(Clone, Debug)]
8+
pub struct DatabaseConfig {
9+
pub logs_path: String,
10+
pub num_scanners: u8,
11+
pub routing_strategy: RoutingStrategy,
12+
pub collections: BTreeMap<String, CollectionConfig>
13+
}
14+
15+
impl Default for DatabaseConfig {
16+
fn default() -> DatabaseConfig {
17+
DatabaseConfig {
18+
logs_path: "".to_owned(),
19+
num_scanners: 2,
20+
routing_strategy: RoutingStrategy::default(),
21+
collections: BTreeMap::new()
22+
}
23+
}
24+
}
25+
26+
impl DatabaseConfig {
27+
pub fn get_collection_config(&self, collection_name: &str) -> CollectionConfig {
28+
match self.collections.get(collection_name) {
29+
Some(collection_config) => collection_config.clone(),
30+
None => CollectionConfig {
31+
logs_path: self.logs_path.clone(),
32+
num_scanners: self.num_scanners,
33+
routing_strategy: self.routing_strategy.clone()
34+
}
35+
}
36+
}
37+
}
38+
39+
#[cfg_attr(feature = "rustc-serialize", derive(RustcEncodable, RustcDecodable))]
40+
#[cfg_attr(feature = "serde-serialization", derive(Serialize, Deserialize))]
41+
#[derive(Clone, Debug)]
42+
pub struct CollectionConfig {
43+
pub logs_path: String,
44+
pub num_scanners: u8,
45+
pub routing_strategy: RoutingStrategy
46+
}

exar-core/src/connection.rs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use super::*;
2+
3+
use std::sync::{Arc, Mutex};
4+
5+
#[derive(Clone)]
6+
pub struct Connection {
7+
collection: Arc<Mutex<Collection>>
8+
}
9+
10+
impl Connection {
11+
pub fn new(collection: Arc<Mutex<Collection>>) -> Connection {
12+
Connection {
13+
collection: collection
14+
}
15+
}
16+
17+
pub fn publish(&self, event: Event) -> Result<usize, DatabaseError> {
18+
self.collection.lock().unwrap().publish(event)
19+
}
20+
21+
pub fn subscribe(&self, query: Query) -> Result<EventStream, DatabaseError> {
22+
self.collection.lock().unwrap().subscribe(query)
23+
}
24+
25+
pub fn close(self) {
26+
drop(self)
27+
}
28+
}

0 commit comments

Comments
 (0)