Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/chatbots+streams #72

Merged
merged 27 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6eb0860
feat: rindexer stream with sns support but foundations also complete
joshstevens19 Aug 2, 2024
27c79ee
refactor: move stream manifest to own file
joshstevens19 Aug 2, 2024
14454dd
feat: webhooks
joshstevens19 Aug 3, 2024
f4a25a7
feat: rabbitmq
joshstevens19 Aug 4, 2024
4eb90e8
feat: kafka
joshstevens19 Aug 4, 2024
025490c
feat: kafka + rabbitmq
joshstevens19 Aug 4, 2024
19151a2
feat: telegram chatbot
joshstevens19 Aug 5, 2024
be56c6a
feat: support conditions on streams
joshstevens19 Aug 5, 2024
e91cced
docs: write docs for streams + chatops
joshstevens19 Aug 5, 2024
a5a6c7c
fest: support index event in order with streams and chatops
joshstevens19 Aug 5, 2024
638e21d
refactor: chatops > chatbots
joshstevens19 Aug 5, 2024
5075c6a
feat: support aws config in the yaml itself to avoid magic environmen…
joshstevens19 Aug 5, 2024
9e14b53
feat: support custom functions in the template_inline
joshstevens19 Aug 6, 2024
027cd33
feat: support discord chatbot
joshstevens19 Aug 6, 2024
544f02e
feat: support slack chatbot
joshstevens19 Aug 6, 2024
f086c67
docs: update changelog.mdx
joshstevens19 Aug 6, 2024
6816d4d
cleanup
joshstevens19 Aug 6, 2024
6fda813
fix docs
joshstevens19 Aug 6, 2024
e62f675
docs: sort headers for stream and chatbots
joshstevens19 Aug 6, 2024
1bfed5c
feat: create .gitignore file for new projects
joshstevens19 Aug 6, 2024
775ba1c
feat: create .gitignore file for new projects
joshstevens19 Aug 6, 2024
08ae186
feat: create last synced blocks for streams
joshstevens19 Aug 6, 2024
d97063e
fix: last testing for streams
joshstevens19 Aug 6, 2024
a9271a7
docs: chatbots clean up
joshstevens19 Aug 6, 2024
89927a3
docs: slack cleanup
joshstevens19 Aug 6, 2024
3a0dfee
fix yaml
joshstevens19 Aug 6, 2024
f8751a6
format
joshstevens19 Aug 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ graphql/node_modules
rindexer_rust_playground/generated_csv/*/*.csv
.env
!.env.example
node_modules
2 changes: 2 additions & 0 deletions cli/src/commands/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub async fn handle_add_contract_command(
dependency_events: None,
reorg_safe_distance: None,
generate_csv: None,
streams: None,
chat: None,
});

write_manifest(&manifest, &rindexer_yaml_path).map_err(|e| {
Expand Down
2 changes: 2 additions & 0 deletions cli/src/commands/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ pub fn handle_new_command(
dependency_events: None,
reorg_safe_distance: None,
generate_csv: None,
streams: None,
chat: None,
}],
phantom: None,
global: None,
Expand Down
12 changes: 11 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,25 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "time"]
chrono = "0.4.38"
log = "0.4.20"
colored = "2.0"
hex = "0.4.3"
uuid = { version = "1.10.0", features = ["v4"] }
# do not change version as have to match ethers at the moment
reqwest = { version = "0.11.27", features = ["json"] }
thread_local = "1.1"
native-tls = "0.2"
postgres-native-tls = "0.5"
aws-config = "1.5.0"
aws-sdk-sns = "1.37.0"
lapin = "2.5.0"
deadpool = { version = "0.12", features = ["rt_tokio_1"] }
deadpool-lapin = "0.12"
rdkafka = { version = "0.36", features = ["tokio"] }
teloxide = "0.12"
serenity = { version = "0.12", features = ["client", "framework"] }

# build
jemallocator = { version = "0.5.0", optional = true }
jemalloc-ctl = { version = "0.5.0", optional = true }
hex = "0.4.3"

[profile.release]
lto = "fat"
Expand Down
315 changes: 315 additions & 0 deletions core/src/chat/clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
use std::sync::Arc;

use ethers::types::U64;
use futures::future::join_all;
use serde_json::Value;
use serenity::all::ChannelId;
use teloxide::types::ChatId;
use thiserror::Error;
use tokio::{
task,
task::{JoinError, JoinHandle},
};

use crate::{
chat::{
discord::{DiscordBot, DiscordError},
slack::{SlackBot, SlackError},
telegram::{TelegramBot, TelegramError},
template::Template,
},
event::{filter_event_data_by_conditions, EventMessage},
manifest::chat::{
ChatConfig, DiscordConfig, DiscordEvent, SlackConfig, SlackEvent, TelegramConfig,
TelegramEvent,
},
};

type SendMessage = Vec<JoinHandle<Result<(), ChatError>>>;

#[derive(Error, Debug)]
pub enum ChatError {
#[error("Telegram error: {0}")]
Telegram(#[from] TelegramError),

#[error("Discord error: {0}")]
Discord(#[from] DiscordError),

#[error("Slack error: {0}")]
Slack(#[from] SlackError),

#[error("Task failed: {0}")]
JoinError(JoinError),
}

#[derive(Debug, Clone)]
struct TelegramInstance {
config: TelegramConfig,
client: Arc<TelegramBot>,
}

#[derive(Debug)]
struct DiscordInstance {
config: DiscordConfig,
client: Arc<DiscordBot>,
}

#[derive(Debug)]
struct SlackInstance {
config: SlackConfig,
client: Arc<SlackBot>,
}

pub struct ChatClients {
telegram: Option<Vec<TelegramInstance>>,
discord: Option<Vec<DiscordInstance>>,
slack: Option<Vec<SlackInstance>>,
}

impl ChatClients {
pub async fn new(chat_config: ChatConfig) -> Self {
let telegram = chat_config.telegram.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(TelegramBot::new(&config.bot_token));
TelegramInstance { config, client }
})
.collect()
});

let discord = chat_config.discord.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(DiscordBot::new(&config.bot_token));
DiscordInstance { config, client }
})
.collect()
});

let slack = chat_config.slack.map(|config| {
config
.into_iter()
.map(|config| {
let client = Arc::new(SlackBot::new(config.bot_token.clone()));
SlackInstance { config, client }
})
.collect()
});

Self { telegram, discord, slack }
}

fn find_accepted_block_range(&self, from_block: &U64, to_block: &U64) -> U64 {
if from_block > to_block {
panic!("Invalid range: from_block must be less than or equal to to_block");
}

match from_block.overflowing_add(to_block - from_block) {
(result, false) => result,
(_, true) => U64::max_value(),
}
}

pub fn is_in_block_range_to_send(&self, from_block: &U64, to_block: &U64) -> bool {
// only 10 blocks at a time else rate limits will kick in
U64::from(10) <= self.find_accepted_block_range(from_block, to_block)
}

fn has_any_chat(&self) -> bool {
self.telegram.is_some()
}

fn telegram_send_message_tasks(
&self,
instance: &TelegramInstance,
event_for: &TelegramEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let chat_id = ChatId(instance.config.chat_id);
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(chat_id, &message).await?;
Ok(())
})
})
.collect();
tasks
}

fn discord_send_message_tasks(
&self,
instance: &DiscordInstance,
event_for: &DiscordEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let channel_id = ChannelId::new(instance.config.channel_id);
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(channel_id, &message).await?;
Ok(())
})
})
.collect();
tasks
}

fn slack_send_message_tasks(
&self,
instance: &SlackInstance,
event_for: &SlackEvent,
events_data: &[Value],
) -> SendMessage {
let tasks: Vec<_> = events_data
.iter()
.filter(|event_data| {
if let Some(conditions) = &event_for.conditions {
filter_event_data_by_conditions(event_data, conditions)
} else {
true
}
})
.map(|event_data| {
let client = Arc::clone(&instance.client);
let channel = instance.config.channel.clone();
let message = Template::new(event_for.template_inline.clone())
.parse_template_inline(event_data);
task::spawn(async move {
client.send_message(&channel, &message).await?;
Ok(())
})
})
.collect();
tasks
}

pub async fn send_message(
&self,
event_message: &EventMessage,
index_event_in_order: bool,
from_block: &U64,
to_block: &U64,
) -> Result<usize, ChatError> {
if !self.has_any_chat() || !self.is_in_block_range_to_send(from_block, to_block) {
return Ok(0);
}

// will always have something even if the event has no parameters due to the tx_information
if let Value::Array(data_array) = &event_message.event_data {
let mut messages: Vec<SendMessage> = Vec::new();

if let Some(telegram) = &self.telegram {
for instance in telegram {
if instance.config.networks.contains(&event_message.network) {
let telegram_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(telegram_event) = telegram_event {
let message = self.telegram_send_message_tasks(
instance,
telegram_event,
data_array,
);
messages.push(message);
}
}
}
}

if let Some(discord) = &self.discord {
for instance in discord {
if instance.config.networks.contains(&event_message.network) {
let discord_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(discord_event) = discord_event {
let message = self.discord_send_message_tasks(
instance,
discord_event,
data_array,
);
messages.push(message);
}
}
}
}

if let Some(slack) = &self.slack {
for instance in slack {
if instance.config.networks.contains(&event_message.network) {
let slack_event = instance
.config
.messages
.iter()
.find(|e| e.event_name == event_message.event_name);

if let Some(slack_event) = slack_event {
let message =
self.slack_send_message_tasks(instance, slack_event, data_array);
messages.push(message);
}
}
}
}

let mut messages_sent = 0;

if index_event_in_order {
for message in messages {
for publish in message {
match publish.await {
Ok(Ok(_)) => messages_sent += 1,
Ok(Err(e)) => return Err(e),
Err(e) => return Err(ChatError::JoinError(e)),
}
}
}
} else {
let tasks: Vec<_> = messages.into_iter().flatten().collect();
let results = join_all(tasks).await;
for result in results {
match result {
Ok(Ok(_)) => messages_sent += 1,
Ok(Err(e)) => return Err(e),
Err(e) => return Err(ChatError::JoinError(e)),
}
}
}

Ok(messages_sent)
} else {
unreachable!("Event data should be an array");
}
}
}
29 changes: 29 additions & 0 deletions core/src/chat/discord.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serenity::{http::Http, model::id::ChannelId};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum DiscordError {
#[error("Discord API error: {0}")]
ApiError(#[from] serenity::Error),
}

#[derive(Debug)]
pub struct DiscordBot {
http: Http,
}

impl DiscordBot {
pub fn new(token: &str) -> Self {
let http = Http::new(token);
Self { http }
}

pub async fn send_message(
&self,
channel_id: ChannelId,
message: &str,
) -> Result<(), DiscordError> {
channel_id.say(&self.http, message).await?;
Ok(())
}
}
Loading
Loading