Skip to content

Commit

Permalink
feat: Enhance Kafka setup and improve service dependencies
Browse files Browse the repository at this point in the history
- Replace manual Kafka setup with services-flake's apache-kafka service
- Add Zookeeper service as a dependency for Kafka
- Introduce kafkaPreStartup script to create the chat_topic
- Update PostgreSQL service name from "pg1" to "pg" for consistency
- Refine service dependencies:
  - Kafka now depends on Zookeeper
  - Topos depends on both PostgreSQL and Kafka
- Remove redundant kafkaSetupHook
- Adjust Kafka settings for simplified configuration
- Include JVM options for Kafka to set heap size

These changes streamline the service setup process and ensure proper
initialization order, improving the overall reliability and maintainability
of the development environment.
  • Loading branch information
G-structure committed Oct 16, 2024
1 parent 02fdaac commit 63d14e5
Showing 1 changed file with 46 additions and 44 deletions.
90 changes: 46 additions & 44 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
configFile = pkgs.copyPathToStore ./config.yaml;
yq = pkgs.yq-go;

kafkaPreStartup = ''
echo "Kafka is ready. Creating topic..."
${pkgs.apacheKafka}/bin/kafka-topics.sh --create --topic chat_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --if-not-exists
'';

# Note: This only loads the settings from the repos config file
# if one is not already set in the user's .config directory.
toposSetupHook = ''
Expand All @@ -85,48 +90,7 @@
else
echo "Config file already exists at $TOPOS_CONFIG_PATH"
fi
'';

kafkaSetupHook = ''
echo "Starting Kafka in Kraft mode..."
# Set up necessary environment variables
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
export KAFKA_KRAFT_MODE=true
echo ${pkgs.apacheKafka}
# Prepare a default config for Kraft mode
if [ ! -f ./kafka.properties ]; then
echo "Initializing Kafka Kraft mode..."
# Server 1 Kraft
cp ${pkgs.apacheKafka}/config/kraft/server.properties ./server-1.properties
sudo sed -i '57!s/PLAINTEXT/MQ/g' server-1.properties
sudo sed -i '30s/.*/controller.quorum.voters=1@localhost:9091/' server-1.properties
sudo sed -i '78s|.*|log.dirs=/tmp/kraft-combined-logs/server-1|' server-1.properties
sudo sed -i '27s|.*|node.id=1|' server-1.properties
sudo sed -i '42s|.*|listeners=MQ://:9092,CONTROLLER://:9091|' server-1.properties
sudo sed -i '92s|.*|offsets.topic.replication.factor=1|' server-1.properties
sudo sed -i '57s|.*|listener.security.protocol.map=CONTROLLER:PLAINTEXT,MQ:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL|' server-1.properties
fi
# Step 1
KAFKA_CLUSTER_ID="$(${pkgs.apacheKafka}/bin/kafka-storage.sh random-uuid)"
# Step 2
${pkgs.apacheKafka}/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ./server-1.properties
# Step 3
${pkgs.apacheKafka}/bin/kafka-server-start.sh ./server-1.properties &
# Step 4
echo "Kafka environment is ready to use and running in detached terminals."
# Step 5
${pkgs.apacheKafka}/bin/kafka-topics.sh --create --topic chat_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
sleep 3
${kafkaPreStartup}
'';

in
Expand Down Expand Up @@ -156,7 +120,7 @@
models = [ "phi3" ];
};

postgres."pg1" = {
postgres."pg" = {
enable = true;
package = pkgs.postgresql_16.withPackages (p: [ p.pgvector ]);
port = 5432;
Expand Down Expand Up @@ -203,10 +167,48 @@
};
};

zookeeper."zookeeper".enable = true;

apache-kafka."kafka" = {
enable = true;
port = 9092;
settings = {
"offsets.topic.replication.factor" = 1;
"zookeeper.connect" = [ "localhost:2181" ];
};
# settings = {
# "broker.id" = 1;
# "log.dirs" = [ "/tmp/kraft-combined-logs/server-1" ];
# "listeners" = [ "PLAINTEXT://localhost:9092" "CONTROLLER://localhost:9091" ];
# "listener.security.protocol.map" = "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT";
# "advertised.listeners" = "PLAINTEXT://localhost:9092";
# "controller.quorum.voters" = "1@localhost:9091";
# "controller.listener.names" = "CONTROLLER";
# "process.roles" = "broker,controller";
# "node.id" = 1;
# "offsets.topic.replication.factor" = 1;
# "transaction.state.log.replication.factor" = 1;
# "transaction.state.log.min.isr" = 1;
# "auto.create.topics.enable" = true;
# "num.partitions" = 1;
# };
# clusterId = "$(${pkgs.apacheKafka}/bin/kafka-storage.sh random-uuid)";
formatLogDirs = true;
formatLogDirsIgnoreFormatted = true;
jvmOptions = [
"-Xmx512M"
"-Xms512M"
];
};

topos.enable = true;
topos.args = [ "run" ];
};
settings.processes.topos.depends_on.pg1.condition = "process_healthy";
settings.processes = {
kafka.depends_on."zookeeper".condition = "process_healthy";
topos.depends_on.pg.condition = "process_healthy";
topos.depends_on.kafka.condition = "process_healthy";
};
};

packages = rec {
Expand Down

0 comments on commit 63d14e5

Please sign in to comment.