Skip to content

Commit

Permalink
Merge branch 'main' into will/fp-108-optimistic-writes
Browse files Browse the repository at this point in the history
  • Loading branch information
WillCorrigan committed Dec 2, 2024
2 parents ddf3423 + f4fc655 commit 4b9ac99
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 173 deletions.
81 changes: 37 additions & 44 deletions packages-rs/drainpipe/src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ const MAX_WANTED_DIDS: usize = 10_000;
const JETSTREAM_ZSTD_DICTIONARY: &[u8] = include_bytes!("../zstd_dictionary");

/// A receiver channel for consuming Jetstream events.
pub type JetstreamReceiver = flume::Receiver<JetstreamEvent>;
pub type JetstreamReceiver = flume::Receiver<Result<JetstreamEvent, JetstreamEventError>>;

/// An internal sender channel for sending Jetstream events to [JetstreamReceiver]'s.
type JetstreamSender = flume::Sender<JetstreamEvent>;
type JetstreamSender = flume::Sender<Result<JetstreamEvent, JetstreamEventError>>;

/// A wrapper connector type for working with a WebSocket connection to a Jetstream instance to
/// receive and consume events. See [JetstreamConnector::connect] for more info.
Expand Down Expand Up @@ -242,50 +242,43 @@ async fn websocket_task(
return Err(JetstreamEventError::WebSocketCloseFailure);
}

Some(Ok(Message::Text(json))) => {
let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(e) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection. {:?}", e
);
return Ok(());
}
}

Some(Ok(Message::Binary(zstd_json))) => {
let mut cursor = Cursor::new(zstd_json);
let mut decoder =
zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
.map_err(JetstreamEventError::CompressionDictionaryError)?;

let mut json = String::new();
decoder
.read_to_string(&mut json)
.map_err(JetstreamEventError::CompressionDecoderError)?;

let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(e) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection... {:?}", e
);
return Ok(());
}
}
Some(message) => {
send_channel.send(decode_message(message, &dictionary)).map_err(|e| {
log::error!("All receivers for the Jetstream connection have been dropped, closing connection. {:?}", e);

unexpected => {
log::error!(
"Received an unexpected message type from Jetstream: {:?}",
unexpected
);
JetstreamEventError::WebSocketCloseFailure
})?;
}
}
}
}

fn decode_message(
message: Result<Message, tokio_tungstenite::tungstenite::Error>,
dictionary: &DecoderDictionary<'_>,
) -> Result<JetstreamEvent, JetstreamEventError> {
let json = match message {
Ok(Message::Text(json)) => json,

Ok(Message::Binary(zstd_json)) => {
let mut cursor = Cursor::new(zstd_json);
let mut decoder =
zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
.map_err(JetstreamEventError::CompressionDictionaryError)?;

let mut json = String::new();
decoder
.read_to_string(&mut json)
.map_err(JetstreamEventError::CompressionDecoderError)?;

json
}

Ok(msg) => Err(JetstreamEventError::UnexpectedEvent(msg))?,

Err(e) => Err(JetstreamEventError::WebsocketReceiveFailure(e))?,
};

serde_json::from_str::<JetstreamEvent>(&json)
.map_err(|e| JetstreamEventError::ReceivedMalformedJSON { error: e, json })
}
13 changes: 11 additions & 2 deletions packages-rs/drainpipe/src/jetstream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@ pub enum ConnectionError {
/// See [websocket_task](crate::websocket_task).
#[derive(Error, Debug)]
pub enum JetstreamEventError {
#[error("received websocket message that could not be deserialized as JSON: {0}")]
ReceivedMalformedJSON(#[from] serde_json::Error),
#[error(
"received websocket message that could not be deserialized as JSON: {json:?}. Error: {error:?}"
)]
ReceivedMalformedJSON {
error: serde_json::Error,
json: String,
},
#[error("failed to load built-in zstd dictionary for decoding: {0}")]
CompressionDictionaryError(io::Error),
#[error("failed to decode zstd-compressed message: {0}")]
CompressionDecoderError(io::Error),
#[error("all receivers were dropped but the websocket connection failed to close cleanly")]
WebSocketCloseFailure,
#[error("Received a websocket error: {0}")]
WebsocketReceiveFailure(tokio_tungstenite::tungstenite::Error),
#[error("Received an event that was not a binary or text message: {0}")]
UnexpectedEvent(tokio_tungstenite::tungstenite::Message),
}
14 changes: 12 additions & 2 deletions packages-rs/drainpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use config::Config;
use jetstream::event::{CommitEvent, JetstreamEvent};
use jetstream::{
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
JetstreamReceiver,
};
use serde_json::json;
use std::path::PathBuf;
Expand Down Expand Up @@ -67,7 +68,7 @@ async fn main() -> anyhow::Result<()> {

loop {
match receiver.recv_async().await {
Ok(event) => {
Ok(Ok(event)) => {
monitor
.instrument(async {
if let JetstreamEvent::Commit(ref commit) = event {
Expand All @@ -90,6 +91,14 @@ async fn main() -> anyhow::Result<()> {
.await?
}

Ok(Err(e)) => {
// TODO: This should add a dead letter
log::error!(
"Error receiving event (possible junk event structure?): {:?}",
e
);
}

Err(e) => {
log::error!("Error receiving event: {:?}", e);
break;
Expand All @@ -99,10 +108,11 @@ async fn main() -> anyhow::Result<()> {

metric_logs_abort_handler.abort();
log::info!("WebSocket connection closed, attempting to reconnect...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

async fn connect(config: JetstreamConfig) -> anyhow::Result<flume::Receiver<JetstreamEvent>> {
async fn connect(config: JetstreamConfig) -> anyhow::Result<JetstreamReceiver> {
let jetstream = JetstreamConnector::new(config)?;
let mut retry_delay_seconds = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import { getHandle, getKey, getPds } from "@atproto/identity";
import { verifyRecords } from "@atproto/repo";
import { Suspense } from "react";

export default async function RkeyPage({
params,
}: {
params: {
export default async function RkeyPage(props: {
params: Promise<{
identifier: string;
collection: string;
rkey: string;
};
}>;
}) {
const params = await props.params;
const identityResult = await resolveIdentity(params.identifier);
if (!identityResult.success) {
return <div>🚨 {identityResult.error}</div>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { resolveIdentity } from "@/lib/atproto-server";
import { CollapsedDidSummary } from "@/app/at/_lib/did-components";

export default async function Layout({
children,
params,
}: {
export default async function Layout(props: {
children: React.ReactNode;
params: { identifier: string };
params: Promise<{ identifier: string }>;
}) {
const params = await props.params;

const { children } = props;

const identityResult = await resolveIdentity(params.identifier);
if (!identityResult.success) {
return <div>🚨 {identityResult.error}</div>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import Link from "@/lib/link";
import { SWRConfig } from "swr";
import { CollectionItems } from "../../_lib/collection";

export default async function CollectionPage({
params,
}: {
params: { identifier: string; collection: string };
export default async function CollectionPage(props: {
params: Promise<{ identifier: string; collection: string }>;
}) {
const params = await props.params;
const identityResult = await resolveIdentity(params.identifier);
if (!identityResult.success) {
return <div>{identityResult.error}</div>;
Expand Down
7 changes: 3 additions & 4 deletions packages/atproto-browser/app/at/[identifier]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import { getPds } from "@atproto/identity";
import { describeRepo } from "@/lib/atproto";
import { isDidWeb } from "@atproto/did";

export default async function IdentifierPage({
params,
}: {
params: { identifier: string };
export default async function IdentifierPage(props: {
params: Promise<{ identifier: string }>;
}) {
const params = await props.params;
const identityResult = await resolveIdentity(params.identifier);
if (!identityResult.success) {
return <div>{identityResult.error}</div>;
Expand Down
9 changes: 4 additions & 5 deletions packages/atproto-browser/app/blob/[identifier]/[cid]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import { resolveIdentity } from "@/lib/atproto-server";
import { getPds } from "@atproto/identity";
import { CSSProperties, ReactNode } from "react";

export default async function BlobPage({
params,
}: {
params: {
export default async function BlobPage(props: {
params: Promise<{
identifier: string;
cid: string;
};
}>;
}) {
const params = await props.params;
const identityResult = await resolveIdentity(params.identifier);
if (!identityResult.success) {
return <p>{identityResult.error}</p>;
Expand Down
6 changes: 6 additions & 0 deletions packages/atproto-browser/app/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ export default function RootLayout({
}>) {
return (
<html lang="en">
<head>
<meta
name="google-site-verification"
content="Ft-CpRXefaiprzpKZCqaZZWvvWMBcuFvAhkv-l3f8vU"
/>
</head>
<body>
{children}
<script
Expand Down
12 changes: 12 additions & 0 deletions packages/atproto-browser/app/robots.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { MetadataRoute } from "next";

export default function robots(): MetadataRoute.Robots {
return {
rules: {
userAgent: "*",
// Disallow all paths except for the root path.
allow: "/$",
disallow: "/",
},
};
}
4 changes: 2 additions & 2 deletions packages/atproto-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"@atproto/syntax": "^0.3.0",
"hls.js": "^1.5.15",
"http-link-header": "^1.1.3",
"next": "15.0.0-rc.0",
"next": "15.0.3",
"node-html-parser": "^6.1.13",
"parse-hls": "^1.0.7",
"react": "19.0.0-rc-f994737d14-20240522",
Expand All @@ -35,7 +35,7 @@
"@types/react": "^18.3.10",
"@types/react-dom": "^18.3.0",
"eslint": "^8",
"eslint-config-next": "15.0.0-rc.0",
"eslint-config-next": "15.0.3",
"tsx": "^4.16.5",
"typescript": "^5",
"vite-tsconfig-paths": "^4.3.2",
Expand Down
2 changes: 1 addition & 1 deletion packages/frontpage/app/(app)/_components/post-card.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export async function PostCard({
getVerifiedHandle(author),
getUser(),
]);
const postHref = `/post/${handle}/${rkey}`;
const postHref = `/post/${handle ?? author}/${rkey}`;

return (
// TODO: Make article route to postHref via onClick on card except innser links or buttons
Expand Down
Loading

0 comments on commit 4b9ac99

Please sign in to comment.